Merge branch '20637-prefetch-threads' refs #20637
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import errno
19 import io
20 import logging
21 import math
22 import os
23 import pycurl
24 import queue
25 import re
26 import socket
27 import ssl
28 import sys
29 import threading
30 import resource
31 from . import timer
32 import urllib.parse
33 import traceback
34 import weakref
35
36 if sys.version_info >= (3, 0):
37     from io import BytesIO
38 else:
39     from cStringIO import StringIO as BytesIO
40
41 import arvados
42 import arvados.config as config
43 import arvados.errors
44 import arvados.retry as retry
45 import arvados.util
46 import arvados.diskcache
47 from arvados._pycurlhelper import PyCurlHelper
48
49 _logger = logging.getLogger('arvados.keep')
50 global_client_object = None
51
52
53 # Monkey patch TCP constants when not available (apple). Values sourced from:
54 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
55 if sys.platform == 'darwin':
56     if not hasattr(socket, 'TCP_KEEPALIVE'):
57         socket.TCP_KEEPALIVE = 0x010
58     if not hasattr(socket, 'TCP_KEEPINTVL'):
59         socket.TCP_KEEPINTVL = 0x101
60     if not hasattr(socket, 'TCP_KEEPCNT'):
61         socket.TCP_KEEPCNT = 0x102
62
63
64 class KeepLocator(object):
65     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
66     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
67
68     def __init__(self, locator_str):
69         self.hints = []
70         self._perm_sig = None
71         self._perm_expiry = None
72         pieces = iter(locator_str.split('+'))
73         self.md5sum = next(pieces)
74         try:
75             self.size = int(next(pieces))
76         except StopIteration:
77             self.size = None
78         for hint in pieces:
79             if self.HINT_RE.match(hint) is None:
80                 raise ValueError("invalid hint format: {}".format(hint))
81             elif hint.startswith('A'):
82                 self.parse_permission_hint(hint)
83             else:
84                 self.hints.append(hint)
85
86     def __str__(self):
87         return '+'.join(
88             native_str(s)
89             for s in [self.md5sum, self.size,
90                       self.permission_hint()] + self.hints
91             if s is not None)
92
93     def stripped(self):
94         if self.size is not None:
95             return "%s+%i" % (self.md5sum, self.size)
96         else:
97             return self.md5sum
98
99     def _make_hex_prop(name, length):
100         # Build and return a new property with the given name that
101         # must be a hex string of the given length.
102         data_name = '_{}'.format(name)
103         def getter(self):
104             return getattr(self, data_name)
105         def setter(self, hex_str):
106             if not arvados.util.is_hex(hex_str, length):
107                 raise ValueError("{} is not a {}-digit hex string: {!r}".
108                                  format(name, length, hex_str))
109             setattr(self, data_name, hex_str)
110         return property(getter, setter)
111
112     md5sum = _make_hex_prop('md5sum', 32)
113     perm_sig = _make_hex_prop('perm_sig', 40)
114
115     @property
116     def perm_expiry(self):
117         return self._perm_expiry
118
119     @perm_expiry.setter
120     def perm_expiry(self, value):
121         if not arvados.util.is_hex(value, 1, 8):
122             raise ValueError(
123                 "permission timestamp must be a hex Unix timestamp: {}".
124                 format(value))
125         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
126
127     def permission_hint(self):
128         data = [self.perm_sig, self.perm_expiry]
129         if None in data:
130             return None
131         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
132         return "A{}@{:08x}".format(*data)
133
134     def parse_permission_hint(self, s):
135         try:
136             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
137         except IndexError:
138             raise ValueError("bad permission hint {}".format(s))
139
140     def permission_expired(self, as_of_dt=None):
141         if self.perm_expiry is None:
142             return False
143         elif as_of_dt is None:
144             as_of_dt = datetime.datetime.now()
145         return self.perm_expiry <= as_of_dt
146
147
148 class Keep(object):
149     """Simple interface to a global KeepClient object.
150
151     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
152     own API client.  The global KeepClient will build an API client from the
153     current Arvados configuration, which may not match the one you built.
154     """
155     _last_key = None
156
157     @classmethod
158     def global_client_object(cls):
159         global global_client_object
160         # Previously, KeepClient would change its behavior at runtime based
161         # on these configuration settings.  We simulate that behavior here
162         # by checking the values and returning a new KeepClient if any of
163         # them have changed.
164         key = (config.get('ARVADOS_API_HOST'),
165                config.get('ARVADOS_API_TOKEN'),
166                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
167                config.get('ARVADOS_KEEP_PROXY'),
168                os.environ.get('KEEP_LOCAL_STORE'))
169         if (global_client_object is None) or (cls._last_key != key):
170             global_client_object = KeepClient()
171             cls._last_key = key
172         return global_client_object
173
174     @staticmethod
175     def get(locator, **kwargs):
176         return Keep.global_client_object().get(locator, **kwargs)
177
178     @staticmethod
179     def put(data, **kwargs):
180         return Keep.global_client_object().put(data, **kwargs)
181
182 class KeepBlockCache(object):
183     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
184         self.cache_max = cache_max
185         self._cache = []
186         self._cache_lock = threading.Lock()
187         self._max_slots = max_slots
188         self._disk_cache = disk_cache
189         self._disk_cache_dir = disk_cache_dir
190         self._cache_updating = threading.Condition(self._cache_lock)
191
192         if self._disk_cache and self._disk_cache_dir is None:
193             self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
194             os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
195
196         if self._max_slots == 0:
197             if self._disk_cache:
198                 # Each block uses two file descriptors, one used to
199                 # open it initially and hold the flock(), and a second
200                 # hidden one used by mmap().
201                 #
202                 # Set max slots to 1/8 of maximum file handles.  This
203                 # means we'll use at most 1/4 of total file handles.
204                 #
205                 # NOFILE typically defaults to 1024 on Linux so this
206                 # is 128 slots (256 file handles), which means we can
207                 # cache up to 8 GiB of 64 MiB blocks.  This leaves
208                 # 768 file handles for sockets and other stuff.
209                 #
210                 # When we want the ability to have more cache (e.g. in
211                 # arv-mount) we'll increase rlimit before calling
212                 # this.
213                 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
214             else:
215                 # RAM cache slots
216                 self._max_slots = 512
217
218         if self.cache_max == 0:
219             if self._disk_cache:
220                 fs = os.statvfs(self._disk_cache_dir)
221                 # Calculation of available space incorporates existing cache usage
222                 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
223                 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
224                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
225                 # pick smallest of:
226                 # 10% of total disk size
227                 # 25% of available space
228                 # max_slots * 64 MiB
229                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
230             else:
231                 # 256 MiB in RAM
232                 self.cache_max = (256 * 1024 * 1024)
233
234         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
235
236         if self._disk_cache:
237             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
238             self.cap_cache()
239
240
241     class CacheSlot(object):
242         __slots__ = ("locator", "ready", "content")
243
244         def __init__(self, locator):
245             self.locator = locator
246             self.ready = threading.Event()
247             self.content = None
248
249         def get(self):
250             self.ready.wait()
251             return self.content
252
253         def set(self, value):
254             self.content = value
255             self.ready.set()
256
257         def size(self):
258             if self.content is None:
259                 return 0
260             else:
261                 return len(self.content)
262
263         def evict(self):
264             self.content = None
265             return self.gone()
266
267         def gone(self):
268             return (self.content is None)
269
270     def _resize_cache(self, cache_max, max_slots):
271         # Try and make sure the contents of the cache do not exceed
272         # the supplied maximums.
273
274         # Select all slots except those where ready.is_set() and content is
275         # None (that means there was an error reading the block).
276         self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
277         sm = sum([slot.size() for slot in self._cache])
278         while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
279             for i in range(len(self._cache)-1, -1, -1):
280                 # start from the back, find a slot that is a candidate to evict
281                 if self._cache[i].ready.is_set():
282                     sz = self._cache[i].size()
283
284                     # If evict returns false it means the
285                     # underlying disk cache couldn't lock the file
286                     # for deletion because another process was using
287                     # it. Don't count it as reducing the amount
288                     # of data in the cache, find something else to
289                     # throw out.
290                     if self._cache[i].evict():
291                         sm -= sz
292
293                     # check to make sure the underlying data is gone
294                     if self._cache[i].gone():
295                         # either way we forget about it.  either the
296                         # other process will delete it, or if we need
297                         # it again and it is still there, we'll find
298                         # it on disk.
299                         del self._cache[i]
300                     break
301
302
303     def cap_cache(self):
304         '''Cap the cache size to self.cache_max'''
305         with self._cache_updating:
306             self._resize_cache(self.cache_max, self._max_slots)
307             self._cache_updating.notify_all()
308
309     def _get(self, locator):
310         # Test if the locator is already in the cache
311         for i in range(0, len(self._cache)):
312             if self._cache[i].locator == locator:
313                 n = self._cache[i]
314                 if i != 0:
315                     # move it to the front
316                     del self._cache[i]
317                     self._cache.insert(0, n)
318                 return n
319         if self._disk_cache:
320             # see if it exists on disk
321             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
322             if n is not None:
323                 self._cache.insert(0, n)
324                 return n
325         return None
326
327     def get(self, locator):
328         with self._cache_lock:
329             return self._get(locator)
330
331     def reserve_cache(self, locator):
332         '''Reserve a cache slot for the specified locator,
333         or return the existing slot.'''
334         with self._cache_updating:
335             n = self._get(locator)
336             if n:
337                 return n, False
338             else:
339                 # Add a new cache slot for the locator
340                 self._resize_cache(self.cache_max, self._max_slots-1)
341                 while len(self._cache) >= self._max_slots:
342                     # If there isn't a slot available, need to wait
343                     # for something to happen that releases one of the
344                     # cache slots.  Idle for 200 ms or woken up by
345                     # another thread
346                     self._cache_updating.wait(timeout=0.2)
347                     self._resize_cache(self.cache_max, self._max_slots-1)
348
349                 if self._disk_cache:
350                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
351                 else:
352                     n = KeepBlockCache.CacheSlot(locator)
353                 self._cache.insert(0, n)
354                 return n, True
355
356     def set(self, slot, blob):
357         try:
358             slot.set(blob)
359             return
360         except OSError as e:
361             if e.errno == errno.ENOMEM:
362                 # Reduce max slots to current - 4, cap cache and retry
363                 with self._cache_lock:
364                     self._max_slots = max(4, len(self._cache) - 4)
365             elif e.errno == errno.ENOSPC:
366                 # Reduce disk max space to current - 256 MiB, cap cache and retry
367                 with self._cache_lock:
368                     sm = sum([st.size() for st in self._cache])
369                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
370             elif e.errno == errno.ENODEV:
371                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
372         except Exception as e:
373             pass
374         finally:
375             # Check if we should evict things from the cache.  Either
376             # because we added a new thing or there was an error and
377             # we possibly adjusted the limits down, so we might need
378             # to push something out.
379             self.cap_cache()
380
381         try:
382             # Only gets here if there was an error the first time. The
383             # exception handler adjusts limits downward in some cases
384             # to free up resources, which would make the operation
385             # succeed.
386             slot.set(blob)
387         except Exception as e:
388             # It failed again.  Give up.
389             slot.set(None)
390             raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
391
392         self.cap_cache()
393
394 class Counter(object):
395     def __init__(self, v=0):
396         self._lk = threading.Lock()
397         self._val = v
398
399     def add(self, v):
400         with self._lk:
401             self._val += v
402
403     def get(self):
404         with self._lk:
405             return self._val
406
407
408 class KeepClient(object):
409     DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
410     DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
411
412     class KeepService(PyCurlHelper):
413         """Make requests to a single Keep service, and track results.
414
415         A KeepService is intended to last long enough to perform one
416         transaction (GET or PUT) against one Keep service. This can
417         involve calling either get() or put() multiple times in order
418         to retry after transient failures. However, calling both get()
419         and put() on a single instance -- or using the same instance
420         to access two different Keep services -- will not produce
421         sensible behavior.
422         """
423
424         HTTP_ERRORS = (
425             socket.error,
426             ssl.SSLError,
427             arvados.errors.HttpError,
428         )
429
430         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
431                      upload_counter=None,
432                      download_counter=None,
433                      headers={},
434                      insecure=False):
435             super(KeepClient.KeepService, self).__init__()
436             self.root = root
437             self._user_agent_pool = user_agent_pool
438             self._result = {'error': None}
439             self._usable = True
440             self._session = None
441             self._socket = None
442             self.get_headers = {'Accept': 'application/octet-stream'}
443             self.get_headers.update(headers)
444             self.put_headers = headers
445             self.upload_counter = upload_counter
446             self.download_counter = download_counter
447             self.insecure = insecure
448
449         def usable(self):
450             """Is it worth attempting a request?"""
451             return self._usable
452
453         def finished(self):
454             """Did the request succeed or encounter permanent failure?"""
455             return self._result['error'] == False or not self._usable
456
457         def last_result(self):
458             return self._result
459
460         def _get_user_agent(self):
461             try:
462                 return self._user_agent_pool.get(block=False)
463             except queue.Empty:
464                 return pycurl.Curl()
465
466         def _put_user_agent(self, ua):
467             try:
468                 ua.reset()
469                 self._user_agent_pool.put(ua, block=False)
470             except:
471                 ua.close()
472
473         def get(self, locator, method="GET", timeout=None):
474             # locator is a KeepLocator object.
475             url = self.root + str(locator)
476             _logger.debug("Request: %s %s", method, url)
477             curl = self._get_user_agent()
478             ok = None
479             try:
480                 with timer.Timer() as t:
481                     self._headers = {}
482                     response_body = BytesIO()
483                     curl.setopt(pycurl.NOSIGNAL, 1)
484                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
485                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
486                     curl.setopt(pycurl.URL, url.encode('utf-8'))
487                     curl.setopt(pycurl.HTTPHEADER, [
488                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
489                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
490                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
491                     if self.insecure:
492                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
493                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
494                     else:
495                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
496                     if method == "HEAD":
497                         curl.setopt(pycurl.NOBODY, True)
498                     else:
499                         curl.setopt(pycurl.HTTPGET, True)
500                     self._setcurltimeouts(curl, timeout, method=="HEAD")
501
502                     try:
503                         curl.perform()
504                     except Exception as e:
505                         raise arvados.errors.HttpError(0, str(e))
506                     finally:
507                         if self._socket:
508                             self._socket.close()
509                             self._socket = None
510                     self._result = {
511                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
512                         'body': response_body.getvalue(),
513                         'headers': self._headers,
514                         'error': False,
515                     }
516
517                 ok = retry.check_http_response_success(self._result['status_code'])
518                 if not ok:
519                     self._result['error'] = arvados.errors.HttpError(
520                         self._result['status_code'],
521                         self._headers.get('x-status-line', 'Error'))
522             except self.HTTP_ERRORS as e:
523                 self._result = {
524                     'error': e,
525                 }
526             self._usable = ok != False
527             if self._result.get('status_code', None):
528                 # The client worked well enough to get an HTTP status
529                 # code, so presumably any problems are just on the
530                 # server side and it's OK to reuse the client.
531                 self._put_user_agent(curl)
532             else:
533                 # Don't return this client to the pool, in case it's
534                 # broken.
535                 curl.close()
536             if not ok:
537                 _logger.debug("Request fail: GET %s => %s: %s",
538                               url, type(self._result['error']), str(self._result['error']))
539                 return None
540             if method == "HEAD":
541                 _logger.info("HEAD %s: %s bytes",
542                          self._result['status_code'],
543                          self._result.get('content-length'))
544                 if self._result['headers'].get('x-keep-locator'):
545                     # This is a response to a remote block copy request, return
546                     # the local copy block locator.
547                     return self._result['headers'].get('x-keep-locator')
548                 return True
549
550             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
551                          self._result['status_code'],
552                          len(self._result['body']),
553                          t.msecs,
554                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
555
556             if self.download_counter:
557                 self.download_counter.add(len(self._result['body']))
558             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
559             if resp_md5 != locator.md5sum:
560                 _logger.warning("Checksum fail: md5(%s) = %s",
561                                 url, resp_md5)
562                 self._result['error'] = arvados.errors.HttpError(
563                     0, 'Checksum fail')
564                 return None
565             return self._result['body']
566
567         def put(self, hash_s, body, timeout=None, headers={}):
568             put_headers = copy.copy(self.put_headers)
569             put_headers.update(headers)
570             url = self.root + hash_s
571             _logger.debug("Request: PUT %s", url)
572             curl = self._get_user_agent()
573             ok = None
574             try:
575                 with timer.Timer() as t:
576                     self._headers = {}
577                     body_reader = BytesIO(body)
578                     response_body = BytesIO()
579                     curl.setopt(pycurl.NOSIGNAL, 1)
580                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
581                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
582                     curl.setopt(pycurl.URL, url.encode('utf-8'))
583                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
584                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
585                     # response) instead of sending the request body immediately.
586                     # This allows the server to reject the request if the request
587                     # is invalid or the server is read-only, without waiting for
588                     # the client to send the entire block.
589                     curl.setopt(pycurl.UPLOAD, True)
590                     curl.setopt(pycurl.INFILESIZE, len(body))
591                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
592                     curl.setopt(pycurl.HTTPHEADER, [
593                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
594                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
595                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
596                     if self.insecure:
597                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
598                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
599                     else:
600                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
601                     self._setcurltimeouts(curl, timeout)
602                     try:
603                         curl.perform()
604                     except Exception as e:
605                         raise arvados.errors.HttpError(0, str(e))
606                     finally:
607                         if self._socket:
608                             self._socket.close()
609                             self._socket = None
610                     self._result = {
611                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
612                         'body': response_body.getvalue().decode('utf-8'),
613                         'headers': self._headers,
614                         'error': False,
615                     }
616                 ok = retry.check_http_response_success(self._result['status_code'])
617                 if not ok:
618                     self._result['error'] = arvados.errors.HttpError(
619                         self._result['status_code'],
620                         self._headers.get('x-status-line', 'Error'))
621             except self.HTTP_ERRORS as e:
622                 self._result = {
623                     'error': e,
624                 }
625             self._usable = ok != False # still usable if ok is True or None
626             if self._result.get('status_code', None):
627                 # Client is functional. See comment in get().
628                 self._put_user_agent(curl)
629             else:
630                 curl.close()
631             if not ok:
632                 _logger.debug("Request fail: PUT %s => %s: %s",
633                               url, type(self._result['error']), str(self._result['error']))
634                 return False
635             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
636                          self._result['status_code'],
637                          len(body),
638                          t.msecs,
639                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
640             if self.upload_counter:
641                 self.upload_counter.add(len(body))
642             return True
643
644
645     class KeepWriterQueue(queue.Queue):
646         def __init__(self, copies, classes=[]):
647             queue.Queue.__init__(self) # Old-style superclass
648             self.wanted_copies = copies
649             self.wanted_storage_classes = classes
650             self.successful_copies = 0
651             self.confirmed_storage_classes = {}
652             self.response = None
653             self.storage_classes_tracking = True
654             self.queue_data_lock = threading.RLock()
655             self.pending_tries = max(copies, len(classes))
656             self.pending_tries_notification = threading.Condition()
657
658         def write_success(self, response, replicas_nr, classes_confirmed):
659             with self.queue_data_lock:
660                 self.successful_copies += replicas_nr
661                 if classes_confirmed is None:
662                     self.storage_classes_tracking = False
663                 elif self.storage_classes_tracking:
664                     for st_class, st_copies in classes_confirmed.items():
665                         try:
666                             self.confirmed_storage_classes[st_class] += st_copies
667                         except KeyError:
668                             self.confirmed_storage_classes[st_class] = st_copies
669                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
670                 self.response = response
671             with self.pending_tries_notification:
672                 self.pending_tries_notification.notify_all()
673
674         def write_fail(self, ks):
675             with self.pending_tries_notification:
676                 self.pending_tries += 1
677                 self.pending_tries_notification.notify()
678
679         def pending_copies(self):
680             with self.queue_data_lock:
681                 return self.wanted_copies - self.successful_copies
682
683         def satisfied_classes(self):
684             with self.queue_data_lock:
685                 if not self.storage_classes_tracking:
686                     # Notifies disabled storage classes expectation to
687                     # the outer loop.
688                     return None
689             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
690
691         def pending_classes(self):
692             with self.queue_data_lock:
693                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
694                     return []
695                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
696                 for st_class, st_copies in self.confirmed_storage_classes.items():
697                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
698                         unsatisfied_classes.remove(st_class)
699                 return unsatisfied_classes
700
701         def get_next_task(self):
702             with self.pending_tries_notification:
703                 while True:
704                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
705                         # This notify_all() is unnecessary --
706                         # write_success() already called notify_all()
707                         # when pending<1 became true, so it's not
708                         # possible for any other thread to be in
709                         # wait() now -- but it's cheap insurance
710                         # against deadlock so we do it anyway:
711                         self.pending_tries_notification.notify_all()
712                         # Drain the queue and then raise Queue.Empty
713                         while True:
714                             self.get_nowait()
715                             self.task_done()
716                     elif self.pending_tries > 0:
717                         service, service_root = self.get_nowait()
718                         if service.finished():
719                             self.task_done()
720                             continue
721                         self.pending_tries -= 1
722                         return service, service_root
723                     elif self.empty():
724                         self.pending_tries_notification.notify_all()
725                         raise queue.Empty
726                     else:
727                         self.pending_tries_notification.wait()
728
729
730     class KeepWriterThreadPool(object):
731         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
732             self.total_task_nr = 0
733             if (not max_service_replicas) or (max_service_replicas >= copies):
734                 num_threads = 1
735             else:
736                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
737             _logger.debug("Pool max threads is %d", num_threads)
738             self.workers = []
739             self.queue = KeepClient.KeepWriterQueue(copies, classes)
740             # Create workers
741             for _ in range(num_threads):
742                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
743                 self.workers.append(w)
744
745         def add_task(self, ks, service_root):
746             self.queue.put((ks, service_root))
747             self.total_task_nr += 1
748
749         def done(self):
750             return self.queue.successful_copies, self.queue.satisfied_classes()
751
752         def join(self):
753             # Start workers
754             for worker in self.workers:
755                 worker.start()
756             # Wait for finished work
757             self.queue.join()
758
759         def response(self):
760             return self.queue.response
761
762
763     class KeepWriterThread(threading.Thread):
764         class TaskFailed(RuntimeError): pass
765
766         def __init__(self, queue, data, data_hash, timeout=None):
767             super(KeepClient.KeepWriterThread, self).__init__()
768             self.timeout = timeout
769             self.queue = queue
770             self.data = data
771             self.data_hash = data_hash
772             self.daemon = True
773
774         def run(self):
775             while True:
776                 try:
777                     service, service_root = self.queue.get_next_task()
778                 except queue.Empty:
779                     return
780                 try:
781                     locator, copies, classes = self.do_task(service, service_root)
782                 except Exception as e:
783                     if not isinstance(e, self.TaskFailed):
784                         _logger.exception("Exception in KeepWriterThread")
785                     self.queue.write_fail(service)
786                 else:
787                     self.queue.write_success(locator, copies, classes)
788                 finally:
789                     self.queue.task_done()
790
791         def do_task(self, service, service_root):
792             classes = self.queue.pending_classes()
793             headers = {}
794             if len(classes) > 0:
795                 classes.sort()
796                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
797             success = bool(service.put(self.data_hash,
798                                         self.data,
799                                         timeout=self.timeout,
800                                         headers=headers))
801             result = service.last_result()
802
803             if not success:
804                 if result.get('status_code'):
805                     _logger.debug("Request fail: PUT %s => %s %s",
806                                   self.data_hash,
807                                   result.get('status_code'),
808                                   result.get('body'))
809                 raise self.TaskFailed()
810
811             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
812                           str(threading.current_thread()),
813                           self.data_hash,
814                           len(self.data),
815                           service_root)
816             try:
817                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
818             except (KeyError, ValueError):
819                 replicas_stored = 1
820
821             classes_confirmed = {}
822             try:
823                 scch = result['headers']['x-keep-storage-classes-confirmed']
824                 for confirmation in scch.replace(' ', '').split(','):
825                     if '=' in confirmation:
826                         stored_class, stored_copies = confirmation.split('=')[:2]
827                         classes_confirmed[stored_class] = int(stored_copies)
828             except (KeyError, ValueError):
829                 # Storage classes confirmed header missing or corrupt
830                 classes_confirmed = None
831
832             return result['body'].strip(), replicas_stored, classes_confirmed
833
834
835     def __init__(self, api_client=None, proxy=None,
836                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
837                  api_token=None, local_store=None, block_cache=None,
838                  num_retries=10, session=None, num_prefetch_threads=None):
839         """Initialize a new KeepClient.
840
841         Arguments:
842         :api_client:
843           The API client to use to find Keep services.  If not
844           provided, KeepClient will build one from available Arvados
845           configuration.
846
847         :proxy:
848           If specified, this KeepClient will send requests to this Keep
849           proxy.  Otherwise, KeepClient will fall back to the setting of the
850           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
851           If you want to KeepClient does not use a proxy, pass in an empty
852           string.
853
854         :timeout:
855           The initial timeout (in seconds) for HTTP requests to Keep
856           non-proxy servers.  A tuple of three floats is interpreted as
857           (connection_timeout, read_timeout, minimum_bandwidth). A connection
858           will be aborted if the average traffic rate falls below
859           minimum_bandwidth bytes per second over an interval of read_timeout
860           seconds. Because timeouts are often a result of transient server
861           load, the actual connection timeout will be increased by a factor
862           of two on each retry.
863           Default: (2, 256, 32768).
864
865         :proxy_timeout:
866           The initial timeout (in seconds) for HTTP requests to
867           Keep proxies. A tuple of three floats is interpreted as
868           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
869           described above for adjusting connection timeouts on retry also
870           applies.
871           Default: (20, 256, 32768).
872
873         :api_token:
874           If you're not using an API client, but only talking
875           directly to a Keep proxy, this parameter specifies an API token
876           to authenticate Keep requests.  It is an error to specify both
877           api_client and api_token.  If you specify neither, KeepClient
878           will use one available from the Arvados configuration.
879
880         :local_store:
881           If specified, this KeepClient will bypass Keep
882           services, and save data to the named directory.  If unspecified,
883           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
884           environment variable.  If you want to ensure KeepClient does not
885           use local storage, pass in an empty string.  This is primarily
886           intended to mock a server for testing.
887
888         :num_retries:
889           The default number of times to retry failed requests.
890           This will be used as the default num_retries value when get() and
891           put() are called.  Default 10.
892         """
893         self.lock = threading.Lock()
894         if proxy is None:
895             if config.get('ARVADOS_KEEP_SERVICES'):
896                 proxy = config.get('ARVADOS_KEEP_SERVICES')
897             else:
898                 proxy = config.get('ARVADOS_KEEP_PROXY')
899         if api_token is None:
900             if api_client is None:
901                 api_token = config.get('ARVADOS_API_TOKEN')
902             else:
903                 api_token = api_client.api_token
904         elif api_client is not None:
905             raise ValueError(
906                 "can't build KeepClient with both API client and token")
907         if local_store is None:
908             local_store = os.environ.get('KEEP_LOCAL_STORE')
909
910         if api_client is None:
911             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
912         else:
913             self.insecure = api_client.insecure
914
915         self.block_cache = block_cache if block_cache else KeepBlockCache()
916         self.timeout = timeout
917         self.proxy_timeout = proxy_timeout
918         self._user_agent_pool = queue.LifoQueue()
919         self.upload_counter = Counter()
920         self.download_counter = Counter()
921         self.put_counter = Counter()
922         self.get_counter = Counter()
923         self.hits_counter = Counter()
924         self.misses_counter = Counter()
925         self._storage_classes_unsupported_warning = False
926         self._default_classes = []
927         self.num_prefetch_threads = num_prefetch_threads or 2
928         self._prefetch_queue = None
929         self._prefetch_threads = None
930
931         if local_store:
932             self.local_store = local_store
933             self.head = self.local_store_head
934             self.get = self.local_store_get
935             self.put = self.local_store_put
936         else:
937             self.num_retries = num_retries
938             self.max_replicas_per_service = None
939             if proxy:
940                 proxy_uris = proxy.split()
941                 for i in range(len(proxy_uris)):
942                     if not proxy_uris[i].endswith('/'):
943                         proxy_uris[i] += '/'
944                     # URL validation
945                     url = urllib.parse.urlparse(proxy_uris[i])
946                     if not (url.scheme and url.netloc):
947                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
948                 self.api_token = api_token
949                 self._gateway_services = {}
950                 self._keep_services = [{
951                     'uuid': "00000-bi6l4-%015d" % idx,
952                     'service_type': 'proxy',
953                     '_service_root': uri,
954                     } for idx, uri in enumerate(proxy_uris)]
955                 self._writable_services = self._keep_services
956                 self.using_proxy = True
957                 self._static_services_list = True
958             else:
959                 # It's important to avoid instantiating an API client
960                 # unless we actually need one, for testing's sake.
961                 if api_client is None:
962                     api_client = arvados.api('v1')
963                 self.api_client = api_client
964                 self.api_token = api_client.api_token
965                 self._gateway_services = {}
966                 self._keep_services = None
967                 self._writable_services = None
968                 self.using_proxy = None
969                 self._static_services_list = False
970                 try:
971                     self._default_classes = [
972                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
973                 except KeyError:
974                     # We're talking to an old cluster
975                     pass
976
977     def current_timeout(self, attempt_number):
978         """Return the appropriate timeout to use for this client.
979
980         The proxy timeout setting if the backend service is currently a proxy,
981         the regular timeout setting otherwise.  The `attempt_number` indicates
982         how many times the operation has been tried already (starting from 0
983         for the first try), and scales the connection timeout portion of the
984         return value accordingly.
985
986         """
987         # TODO(twp): the timeout should be a property of a
988         # KeepService, not a KeepClient. See #4488.
989         t = self.proxy_timeout if self.using_proxy else self.timeout
990         if len(t) == 2:
991             return (t[0] * (1 << attempt_number), t[1])
992         else:
993             return (t[0] * (1 << attempt_number), t[1], t[2])
994     def _any_nondisk_services(self, service_list):
995         return any(ks.get('service_type', 'disk') != 'disk'
996                    for ks in service_list)
997
998     def build_services_list(self, force_rebuild=False):
999         if (self._static_services_list or
1000               (self._keep_services and not force_rebuild)):
1001             return
1002         with self.lock:
1003             try:
1004                 keep_services = self.api_client.keep_services().accessible()
1005             except Exception:  # API server predates Keep services.
1006                 keep_services = self.api_client.keep_disks().list()
1007
1008             # Gateway services are only used when specified by UUID,
1009             # so there's nothing to gain by filtering them by
1010             # service_type.
1011             self._gateway_services = {ks['uuid']: ks for ks in
1012                                       keep_services.execute()['items']}
1013             if not self._gateway_services:
1014                 raise arvados.errors.NoKeepServersError()
1015
1016             # Precompute the base URI for each service.
1017             for r in self._gateway_services.values():
1018                 host = r['service_host']
1019                 if not host.startswith('[') and host.find(':') >= 0:
1020                     # IPv6 URIs must be formatted like http://[::1]:80/...
1021                     host = '[' + host + ']'
1022                 r['_service_root'] = "{}://{}:{:d}/".format(
1023                     'https' if r['service_ssl_flag'] else 'http',
1024                     host,
1025                     r['service_port'])
1026
1027             _logger.debug(str(self._gateway_services))
1028             self._keep_services = [
1029                 ks for ks in self._gateway_services.values()
1030                 if not ks.get('service_type', '').startswith('gateway:')]
1031             self._writable_services = [ks for ks in self._keep_services
1032                                        if not ks.get('read_only')]
1033
1034             # For disk type services, max_replicas_per_service is 1
1035             # It is unknown (unlimited) for other service types.
1036             if self._any_nondisk_services(self._writable_services):
1037                 self.max_replicas_per_service = None
1038             else:
1039                 self.max_replicas_per_service = 1
1040
1041     def _service_weight(self, data_hash, service_uuid):
1042         """Compute the weight of a Keep service endpoint for a data
1043         block with a known hash.
1044
1045         The weight is md5(h + u) where u is the last 15 characters of
1046         the service endpoint's UUID.
1047         """
1048         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1049
1050     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1051         """Return an array of Keep service endpoints, in the order in
1052         which they should be probed when reading or writing data with
1053         the given hash+hints.
1054         """
1055         self.build_services_list(force_rebuild)
1056
1057         sorted_roots = []
1058         # Use the services indicated by the given +K@... remote
1059         # service hints, if any are present and can be resolved to a
1060         # URI.
1061         for hint in locator.hints:
1062             if hint.startswith('K@'):
1063                 if len(hint) == 7:
1064                     sorted_roots.append(
1065                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1066                 elif len(hint) == 29:
1067                     svc = self._gateway_services.get(hint[2:])
1068                     if svc:
1069                         sorted_roots.append(svc['_service_root'])
1070
1071         # Sort the available local services by weight (heaviest first)
1072         # for this locator, and return their service_roots (base URIs)
1073         # in that order.
1074         use_services = self._keep_services
1075         if need_writable:
1076             use_services = self._writable_services
1077         self.using_proxy = self._any_nondisk_services(use_services)
1078         sorted_roots.extend([
1079             svc['_service_root'] for svc in sorted(
1080                 use_services,
1081                 reverse=True,
1082                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1083         _logger.debug("{}: {}".format(locator, sorted_roots))
1084         return sorted_roots
1085
1086     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1087         # roots_map is a dictionary, mapping Keep service root strings
1088         # to KeepService objects.  Poll for Keep services, and add any
1089         # new ones to roots_map.  Return the current list of local
1090         # root strings.
1091         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1092         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1093         for root in local_roots:
1094             if root not in roots_map:
1095                 roots_map[root] = self.KeepService(
1096                     root, self._user_agent_pool,
1097                     upload_counter=self.upload_counter,
1098                     download_counter=self.download_counter,
1099                     headers=headers,
1100                     insecure=self.insecure)
1101         return local_roots
1102
1103     @staticmethod
1104     def _check_loop_result(result):
1105         # KeepClient RetryLoops should save results as a 2-tuple: the
1106         # actual result of the request, and the number of servers available
1107         # to receive the request this round.
1108         # This method returns True if there's a real result, False if
1109         # there are no more servers available, otherwise None.
1110         if isinstance(result, Exception):
1111             return None
1112         result, tried_server_count = result
1113         if (result is not None) and (result is not False):
1114             return True
1115         elif tried_server_count < 1:
1116             _logger.info("No more Keep services to try; giving up")
1117             return False
1118         else:
1119             return None
1120
1121     def get_from_cache(self, loc_s):
1122         """Fetch a block only if is in the cache, otherwise return None."""
1123         locator = KeepLocator(loc_s)
1124         slot = self.block_cache.get(locator.md5sum)
1125         if slot is not None and slot.ready.is_set():
1126             return slot.get()
1127         else:
1128             return None
1129
1130     def refresh_signature(self, loc):
1131         """Ask Keep to get the remote block and return its local signature"""
1132         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1133         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1134
1135     @retry.retry_method
1136     def head(self, loc_s, **kwargs):
1137         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1138
1139     @retry.retry_method
1140     def get(self, loc_s, **kwargs):
1141         return self._get_or_head(loc_s, method="GET", **kwargs)
1142
1143     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1144         """Get data from Keep.
1145
1146         This method fetches one or more blocks of data from Keep.  It
1147         sends a request each Keep service registered with the API
1148         server (or the proxy provided when this client was
1149         instantiated), then each service named in location hints, in
1150         sequence.  As soon as one service provides the data, it's
1151         returned.
1152
1153         Arguments:
1154         * loc_s: A string of one or more comma-separated locators to fetch.
1155           This method returns the concatenation of these blocks.
1156         * num_retries: The number of times to retry GET requests to
1157           *each* Keep server if it returns temporary failures, with
1158           exponential backoff.  Note that, in each loop, the method may try
1159           to fetch data from every available Keep service, along with any
1160           that are named in location hints in the locator.  The default value
1161           is set when the KeepClient is initialized.
1162         """
1163         if ',' in loc_s:
1164             return ''.join(self.get(x) for x in loc_s.split(','))
1165
1166         self.get_counter.add(1)
1167
1168         request_id = (request_id or
1169                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1170                       arvados.util.new_request_id())
1171         if headers is None:
1172             headers = {}
1173         headers['X-Request-Id'] = request_id
1174
1175         slot = None
1176         blob = None
1177         try:
1178             locator = KeepLocator(loc_s)
1179             if method == "GET":
1180                 while slot is None:
1181                     slot, first = self.block_cache.reserve_cache(locator.md5sum)
1182                     if first:
1183                         # Fresh and empty "first time it is used" slot
1184                         break
1185                     if prefetch:
1186                         # this is request for a prefetch to fill in
1187                         # the cache, don't need to wait for the
1188                         # result, so if it is already in flight return
1189                         # immediately.  Clear 'slot' to prevent
1190                         # finally block from calling slot.set()
1191                         slot = None
1192                         return None
1193
1194                     blob = slot.get()
1195                     if blob is not None:
1196                         self.hits_counter.add(1)
1197                         return blob
1198
1199                     # If blob is None, this means either
1200                     #
1201                     # (a) another thread was fetching this block and
1202                     # failed with an error or
1203                     #
1204                     # (b) cache thrashing caused the slot to be
1205                     # evicted (content set to None) by another thread
1206                     # between the call to reserve_cache() and get().
1207                     #
1208                     # We'll handle these cases by reserving a new slot
1209                     # and then doing a full GET request.
1210                     slot = None
1211
1212             self.misses_counter.add(1)
1213
1214             # If the locator has hints specifying a prefix (indicating a
1215             # remote keepproxy) or the UUID of a local gateway service,
1216             # read data from the indicated service(s) instead of the usual
1217             # list of local disk services.
1218             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1219                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1220             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1221                                for hint in locator.hints if (
1222                                        hint.startswith('K@') and
1223                                        len(hint) == 29 and
1224                                        self._gateway_services.get(hint[2:])
1225                                        )])
1226             # Map root URLs to their KeepService objects.
1227             roots_map = {
1228                 root: self.KeepService(root, self._user_agent_pool,
1229                                        upload_counter=self.upload_counter,
1230                                        download_counter=self.download_counter,
1231                                        headers=headers,
1232                                        insecure=self.insecure)
1233                 for root in hint_roots
1234             }
1235
1236             # See #3147 for a discussion of the loop implementation.  Highlights:
1237             # * Refresh the list of Keep services after each failure, in case
1238             #   it's being updated.
1239             # * Retry until we succeed, we're out of retries, or every available
1240             #   service has returned permanent failure.
1241             sorted_roots = []
1242             roots_map = {}
1243             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1244                                    backoff_start=2)
1245             for tries_left in loop:
1246                 try:
1247                     sorted_roots = self.map_new_services(
1248                         roots_map, locator,
1249                         force_rebuild=(tries_left < num_retries),
1250                         need_writable=False,
1251                         headers=headers)
1252                 except Exception as error:
1253                     loop.save_result(error)
1254                     continue
1255
1256                 # Query KeepService objects that haven't returned
1257                 # permanent failure, in our specified shuffle order.
1258                 services_to_try = [roots_map[root]
1259                                    for root in sorted_roots
1260                                    if roots_map[root].usable()]
1261                 for keep_service in services_to_try:
1262                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1263                     if blob is not None:
1264                         break
1265                 loop.save_result((blob, len(services_to_try)))
1266
1267             # Always cache the result, then return it if we succeeded.
1268             if loop.success():
1269                 return blob
1270         finally:
1271             if slot is not None:
1272                 self.block_cache.set(slot, blob)
1273
1274         # Q: Including 403 is necessary for the Keep tests to continue
1275         # passing, but maybe they should expect KeepReadError instead?
1276         not_founds = sum(1 for key in sorted_roots
1277                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1278         service_errors = ((key, roots_map[key].last_result()['error'])
1279                           for key in sorted_roots)
1280         if not roots_map:
1281             raise arvados.errors.KeepReadError(
1282                 "[{}] failed to read {}: no Keep services available ({})".format(
1283                     request_id, loc_s, loop.last_result()))
1284         elif not_founds == len(sorted_roots):
1285             raise arvados.errors.NotFoundError(
1286                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1287         else:
1288             raise arvados.errors.KeepReadError(
1289                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1290
1291     @retry.retry_method
1292     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1293         """Save data in Keep.
1294
1295         This method will get a list of Keep services from the API server, and
1296         send the data to each one simultaneously in a new thread.  Once the
1297         uploads are finished, if enough copies are saved, this method returns
1298         the most recent HTTP response body.  If requests fail to upload
1299         enough copies, this method raises KeepWriteError.
1300
1301         Arguments:
1302         * data: The string of data to upload.
1303         * copies: The number of copies that the user requires be saved.
1304           Default 2.
1305         * num_retries: The number of times to retry PUT requests to
1306           *each* Keep server if it returns temporary failures, with
1307           exponential backoff.  The default value is set when the
1308           KeepClient is initialized.
1309         * classes: An optional list of storage class names where copies should
1310           be written.
1311         """
1312
1313         classes = classes or self._default_classes
1314
1315         if not isinstance(data, bytes):
1316             data = data.encode()
1317
1318         self.put_counter.add(1)
1319
1320         data_hash = hashlib.md5(data).hexdigest()
1321         loc_s = data_hash + '+' + str(len(data))
1322         if copies < 1:
1323             return loc_s
1324         locator = KeepLocator(loc_s)
1325
1326         request_id = (request_id or
1327                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1328                       arvados.util.new_request_id())
1329         headers = {
1330             'X-Request-Id': request_id,
1331             'X-Keep-Desired-Replicas': str(copies),
1332         }
1333         roots_map = {}
1334         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1335                                backoff_start=2)
1336         done_copies = 0
1337         done_classes = []
1338         for tries_left in loop:
1339             try:
1340                 sorted_roots = self.map_new_services(
1341                     roots_map, locator,
1342                     force_rebuild=(tries_left < num_retries),
1343                     need_writable=True,
1344                     headers=headers)
1345             except Exception as error:
1346                 loop.save_result(error)
1347                 continue
1348
1349             pending_classes = []
1350             if done_classes is not None:
1351                 pending_classes = list(set(classes) - set(done_classes))
1352             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1353                                                         data_hash=data_hash,
1354                                                         copies=copies - done_copies,
1355                                                         max_service_replicas=self.max_replicas_per_service,
1356                                                         timeout=self.current_timeout(num_retries - tries_left),
1357                                                         classes=pending_classes)
1358             for service_root, ks in [(root, roots_map[root])
1359                                      for root in sorted_roots]:
1360                 if ks.finished():
1361                     continue
1362                 writer_pool.add_task(ks, service_root)
1363             writer_pool.join()
1364             pool_copies, pool_classes = writer_pool.done()
1365             done_copies += pool_copies
1366             if (done_classes is not None) and (pool_classes is not None):
1367                 done_classes += pool_classes
1368                 loop.save_result(
1369                     (done_copies >= copies and set(done_classes) == set(classes),
1370                     writer_pool.total_task_nr))
1371             else:
1372                 # Old keepstore contacted without storage classes support:
1373                 # success is determined only by successful copies.
1374                 #
1375                 # Disable storage classes tracking from this point forward.
1376                 if not self._storage_classes_unsupported_warning:
1377                     self._storage_classes_unsupported_warning = True
1378                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1379                 done_classes = None
1380                 loop.save_result(
1381                     (done_copies >= copies, writer_pool.total_task_nr))
1382
1383         if loop.success():
1384             return writer_pool.response()
1385         if not roots_map:
1386             raise arvados.errors.KeepWriteError(
1387                 "[{}] failed to write {}: no Keep services available ({})".format(
1388                     request_id, data_hash, loop.last_result()))
1389         else:
1390             service_errors = ((key, roots_map[key].last_result()['error'])
1391                               for key in sorted_roots
1392                               if roots_map[key].last_result()['error'])
1393             raise arvados.errors.KeepWriteError(
1394                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1395                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1396
1397     def _block_prefetch_worker(self):
1398         """The background downloader thread."""
1399         while True:
1400             try:
1401                 b = self._prefetch_queue.get()
1402                 if b is None:
1403                     return
1404                 self.get(b, prefetch=True)
1405             except Exception:
1406                 _logger.exception("Exception doing block prefetch")
1407
1408     def _start_prefetch_threads(self):
1409         if self._prefetch_threads is None:
1410             with self.lock:
1411                 if self._prefetch_threads is not None:
1412                     return
1413                 self._prefetch_queue = queue.Queue()
1414                 self._prefetch_threads = []
1415                 for i in range(0, self.num_prefetch_threads):
1416                     thread = threading.Thread(target=self._block_prefetch_worker)
1417                     self._prefetch_threads.append(thread)
1418                     thread.daemon = True
1419                     thread.start()
1420
1421     def block_prefetch(self, locator):
1422         """
1423         This relies on the fact that KeepClient implements a block cache,
1424         so repeated requests for the same block will not result in repeated
1425         downloads (unless the block is evicted from the cache.)  This method
1426         does not block.
1427         """
1428
1429         self._start_prefetch_threads()
1430         self._prefetch_queue.put(locator)
1431
1432     def stop_prefetch_threads(self):
1433         with self.lock:
1434             if self._prefetch_threads is not None:
1435                 for t in self._prefetch_threads:
1436                     self._prefetch_queue.put(None)
1437                 for t in self._prefetch_threads:
1438                     t.join()
1439             self._prefetch_threads = None
1440             self._prefetch_queue = None
1441
1442     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1443         """A stub for put().
1444
1445         This method is used in place of the real put() method when
1446         using local storage (see constructor's local_store argument).
1447
1448         copies and num_retries arguments are ignored: they are here
1449         only for the sake of offering the same call signature as
1450         put().
1451
1452         Data stored this way can be retrieved via local_store_get().
1453         """
1454         md5 = hashlib.md5(data).hexdigest()
1455         locator = '%s+%d' % (md5, len(data))
1456         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1457             f.write(data)
1458         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1459                   os.path.join(self.local_store, md5))
1460         return locator
1461
1462     def local_store_get(self, loc_s, num_retries=None):
1463         """Companion to local_store_put()."""
1464         try:
1465             locator = KeepLocator(loc_s)
1466         except ValueError:
1467             raise arvados.errors.NotFoundError(
1468                 "Invalid data locator: '%s'" % loc_s)
1469         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1470             return b''
1471         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1472             return f.read()
1473
1474     def local_store_head(self, loc_s, num_retries=None):
1475         """Companion to local_store_put()."""
1476         try:
1477             locator = KeepLocator(loc_s)
1478         except ValueError:
1479             raise arvados.errors.NotFoundError(
1480                 "Invalid data locator: '%s'" % loc_s)
1481         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1482             return True
1483         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1484             return True