15397: Remove code, configs, and docs for hosted git repo feature.
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import errno
19 import io
20 import logging
21 import math
22 import os
23 import pycurl
24 import queue
25 import re
26 import socket
27 import ssl
28 import sys
29 import threading
30 import resource
31 from . import timer
32 import urllib.parse
33 import traceback
34 import weakref
35
36 if sys.version_info >= (3, 0):
37     from io import BytesIO
38 else:
39     from cStringIO import StringIO as BytesIO
40
41 import arvados
42 import arvados.config as config
43 import arvados.errors
44 import arvados.retry as retry
45 import arvados.util
46 import arvados.diskcache
47 from arvados._pycurlhelper import PyCurlHelper
48
49 _logger = logging.getLogger('arvados.keep')
50 global_client_object = None
51
52
53 # Monkey patch TCP constants when not available (apple). Values sourced from:
54 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
55 if sys.platform == 'darwin':
56     if not hasattr(socket, 'TCP_KEEPALIVE'):
57         socket.TCP_KEEPALIVE = 0x010
58     if not hasattr(socket, 'TCP_KEEPINTVL'):
59         socket.TCP_KEEPINTVL = 0x101
60     if not hasattr(socket, 'TCP_KEEPCNT'):
61         socket.TCP_KEEPCNT = 0x102
62
63
64 class KeepLocator(object):
65     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
66     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
67
68     def __init__(self, locator_str):
69         self.hints = []
70         self._perm_sig = None
71         self._perm_expiry = None
72         pieces = iter(locator_str.split('+'))
73         self.md5sum = next(pieces)
74         try:
75             self.size = int(next(pieces))
76         except StopIteration:
77             self.size = None
78         for hint in pieces:
79             if self.HINT_RE.match(hint) is None:
80                 raise ValueError("invalid hint format: {}".format(hint))
81             elif hint.startswith('A'):
82                 self.parse_permission_hint(hint)
83             else:
84                 self.hints.append(hint)
85
86     def __str__(self):
87         return '+'.join(
88             native_str(s)
89             for s in [self.md5sum, self.size,
90                       self.permission_hint()] + self.hints
91             if s is not None)
92
93     def stripped(self):
94         if self.size is not None:
95             return "%s+%i" % (self.md5sum, self.size)
96         else:
97             return self.md5sum
98
99     def _make_hex_prop(name, length):
100         # Build and return a new property with the given name that
101         # must be a hex string of the given length.
102         data_name = '_{}'.format(name)
103         def getter(self):
104             return getattr(self, data_name)
105         def setter(self, hex_str):
106             if not arvados.util.is_hex(hex_str, length):
107                 raise ValueError("{} is not a {}-digit hex string: {!r}".
108                                  format(name, length, hex_str))
109             setattr(self, data_name, hex_str)
110         return property(getter, setter)
111
112     md5sum = _make_hex_prop('md5sum', 32)
113     perm_sig = _make_hex_prop('perm_sig', 40)
114
115     @property
116     def perm_expiry(self):
117         return self._perm_expiry
118
119     @perm_expiry.setter
120     def perm_expiry(self, value):
121         if not arvados.util.is_hex(value, 1, 8):
122             raise ValueError(
123                 "permission timestamp must be a hex Unix timestamp: {}".
124                 format(value))
125         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
126
127     def permission_hint(self):
128         data = [self.perm_sig, self.perm_expiry]
129         if None in data:
130             return None
131         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
132         return "A{}@{:08x}".format(*data)
133
134     def parse_permission_hint(self, s):
135         try:
136             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
137         except IndexError:
138             raise ValueError("bad permission hint {}".format(s))
139
140     def permission_expired(self, as_of_dt=None):
141         if self.perm_expiry is None:
142             return False
143         elif as_of_dt is None:
144             as_of_dt = datetime.datetime.now()
145         return self.perm_expiry <= as_of_dt
146
147
148 class Keep(object):
149     """Simple interface to a global KeepClient object.
150
151     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
152     own API client.  The global KeepClient will build an API client from the
153     current Arvados configuration, which may not match the one you built.
154     """
155     _last_key = None
156
157     @classmethod
158     def global_client_object(cls):
159         global global_client_object
160         # Previously, KeepClient would change its behavior at runtime based
161         # on these configuration settings.  We simulate that behavior here
162         # by checking the values and returning a new KeepClient if any of
163         # them have changed.
164         key = (config.get('ARVADOS_API_HOST'),
165                config.get('ARVADOS_API_TOKEN'),
166                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
167                config.get('ARVADOS_KEEP_PROXY'),
168                os.environ.get('KEEP_LOCAL_STORE'))
169         if (global_client_object is None) or (cls._last_key != key):
170             global_client_object = KeepClient()
171             cls._last_key = key
172         return global_client_object
173
174     @staticmethod
175     def get(locator, **kwargs):
176         return Keep.global_client_object().get(locator, **kwargs)
177
178     @staticmethod
179     def put(data, **kwargs):
180         return Keep.global_client_object().put(data, **kwargs)
181
182 class KeepBlockCache(object):
183     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
184         self.cache_max = cache_max
185         self._cache = collections.OrderedDict()
186         self._cache_lock = threading.Lock()
187         self._max_slots = max_slots
188         self._disk_cache = disk_cache
189         self._disk_cache_dir = disk_cache_dir
190         self._cache_updating = threading.Condition(self._cache_lock)
191
192         if self._disk_cache and self._disk_cache_dir is None:
193             self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
194             os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
195
196         if self._max_slots == 0:
197             if self._disk_cache:
198                 # Each block uses two file descriptors, one used to
199                 # open it initially and hold the flock(), and a second
200                 # hidden one used by mmap().
201                 #
202                 # Set max slots to 1/8 of maximum file handles.  This
203                 # means we'll use at most 1/4 of total file handles.
204                 #
205                 # NOFILE typically defaults to 1024 on Linux so this
206                 # is 128 slots (256 file handles), which means we can
207                 # cache up to 8 GiB of 64 MiB blocks.  This leaves
208                 # 768 file handles for sockets and other stuff.
209                 #
210                 # When we want the ability to have more cache (e.g. in
211                 # arv-mount) we'll increase rlimit before calling
212                 # this.
213                 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
214             else:
215                 # RAM cache slots
216                 self._max_slots = 512
217
218         if self.cache_max == 0:
219             if self._disk_cache:
220                 fs = os.statvfs(self._disk_cache_dir)
221                 # Calculation of available space incorporates existing cache usage
222                 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
223                 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
224                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
225                 # pick smallest of:
226                 # 10% of total disk size
227                 # 25% of available space
228                 # max_slots * 64 MiB
229                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
230             else:
231                 # 256 MiB in RAM
232                 self.cache_max = (256 * 1024 * 1024)
233
234         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
235
236         self.cache_total = 0
237         if self._disk_cache:
238             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
239             for slot in self._cache.values():
240                 self.cache_total += slot.size()
241             self.cap_cache()
242
243     class CacheSlot(object):
244         __slots__ = ("locator", "ready", "content")
245
246         def __init__(self, locator):
247             self.locator = locator
248             self.ready = threading.Event()
249             self.content = None
250
251         def get(self):
252             self.ready.wait()
253             return self.content
254
255         def set(self, value):
256             if self.content is not None:
257                 return False
258             self.content = value
259             self.ready.set()
260             return True
261
262         def size(self):
263             if self.content is None:
264                 return 0
265             else:
266                 return len(self.content)
267
268         def evict(self):
269             self.content = None
270
271
272     def _resize_cache(self, cache_max, max_slots):
273         # Try and make sure the contents of the cache do not exceed
274         # the supplied maximums.
275
276         if self.cache_total <= cache_max and len(self._cache) <= max_slots:
277             return
278
279         _evict_candidates = collections.deque(self._cache.values())
280         while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
281             slot = _evict_candidates.popleft()
282             if not slot.ready.is_set():
283                 continue
284
285             sz = slot.size()
286             slot.evict()
287             self.cache_total -= sz
288             del self._cache[slot.locator]
289
290
291     def cap_cache(self):
292         '''Cap the cache size to self.cache_max'''
293         with self._cache_updating:
294             self._resize_cache(self.cache_max, self._max_slots)
295             self._cache_updating.notify_all()
296
297     def _get(self, locator):
298         # Test if the locator is already in the cache
299         if locator in self._cache:
300             n = self._cache[locator]
301             if n.ready.is_set() and n.content is None:
302                 del self._cache[n.locator]
303                 return None
304             self._cache.move_to_end(locator)
305             return n
306         if self._disk_cache:
307             # see if it exists on disk
308             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
309             if n is not None:
310                 self._cache[n.locator] = n
311                 self.cache_total += n.size()
312                 return n
313         return None
314
315     def get(self, locator):
316         with self._cache_lock:
317             return self._get(locator)
318
319     def reserve_cache(self, locator):
320         '''Reserve a cache slot for the specified locator,
321         or return the existing slot.'''
322         with self._cache_updating:
323             n = self._get(locator)
324             if n:
325                 return n, False
326             else:
327                 # Add a new cache slot for the locator
328                 self._resize_cache(self.cache_max, self._max_slots-1)
329                 while len(self._cache) >= self._max_slots:
330                     # If there isn't a slot available, need to wait
331                     # for something to happen that releases one of the
332                     # cache slots.  Idle for 200 ms or woken up by
333                     # another thread
334                     self._cache_updating.wait(timeout=0.2)
335                     self._resize_cache(self.cache_max, self._max_slots-1)
336
337                 if self._disk_cache:
338                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
339                 else:
340                     n = KeepBlockCache.CacheSlot(locator)
341                 self._cache[n.locator] = n
342                 return n, True
343
344     def set(self, slot, blob):
345         try:
346             if slot.set(blob):
347                 self.cache_total += slot.size()
348             return
349         except OSError as e:
350             if e.errno == errno.ENOMEM:
351                 # Reduce max slots to current - 4, cap cache and retry
352                 with self._cache_lock:
353                     self._max_slots = max(4, len(self._cache) - 4)
354             elif e.errno == errno.ENOSPC:
355                 # Reduce disk max space to current - 256 MiB, cap cache and retry
356                 with self._cache_lock:
357                     sm = sum(st.size() for st in self._cache.values())
358                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
359             elif e.errno == errno.ENODEV:
360                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
361         except Exception as e:
362             pass
363         finally:
364             # Check if we should evict things from the cache.  Either
365             # because we added a new thing or there was an error and
366             # we possibly adjusted the limits down, so we might need
367             # to push something out.
368             self.cap_cache()
369
370         try:
371             # Only gets here if there was an error the first time. The
372             # exception handler adjusts limits downward in some cases
373             # to free up resources, which would make the operation
374             # succeed.
375             if slot.set(blob):
376                 self.cache_total += slot.size()
377         except Exception as e:
378             # It failed again.  Give up.
379             slot.set(None)
380             raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
381
382         self.cap_cache()
383
384 class Counter(object):
385     def __init__(self, v=0):
386         self._lk = threading.Lock()
387         self._val = v
388
389     def add(self, v):
390         with self._lk:
391             self._val += v
392
393     def get(self):
394         with self._lk:
395             return self._val
396
397
398 class KeepClient(object):
399     DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
400     DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
401
402     class KeepService(PyCurlHelper):
403         """Make requests to a single Keep service, and track results.
404
405         A KeepService is intended to last long enough to perform one
406         transaction (GET or PUT) against one Keep service. This can
407         involve calling either get() or put() multiple times in order
408         to retry after transient failures. However, calling both get()
409         and put() on a single instance -- or using the same instance
410         to access two different Keep services -- will not produce
411         sensible behavior.
412         """
413
414         HTTP_ERRORS = (
415             socket.error,
416             ssl.SSLError,
417             arvados.errors.HttpError,
418         )
419
420         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
421                      upload_counter=None,
422                      download_counter=None,
423                      headers={},
424                      insecure=False):
425             super(KeepClient.KeepService, self).__init__()
426             self.root = root
427             self._user_agent_pool = user_agent_pool
428             self._result = {'error': None}
429             self._usable = True
430             self._session = None
431             self._socket = None
432             self.get_headers = {'Accept': 'application/octet-stream'}
433             self.get_headers.update(headers)
434             self.put_headers = headers
435             self.upload_counter = upload_counter
436             self.download_counter = download_counter
437             self.insecure = insecure
438
439         def usable(self):
440             """Is it worth attempting a request?"""
441             return self._usable
442
443         def finished(self):
444             """Did the request succeed or encounter permanent failure?"""
445             return self._result['error'] == False or not self._usable
446
447         def last_result(self):
448             return self._result
449
450         def _get_user_agent(self):
451             try:
452                 return self._user_agent_pool.get(block=False)
453             except queue.Empty:
454                 return pycurl.Curl()
455
456         def _put_user_agent(self, ua):
457             try:
458                 ua.reset()
459                 self._user_agent_pool.put(ua, block=False)
460             except:
461                 ua.close()
462
463         def get(self, locator, method="GET", timeout=None):
464             # locator is a KeepLocator object.
465             url = self.root + str(locator)
466             _logger.debug("Request: %s %s", method, url)
467             curl = self._get_user_agent()
468             ok = None
469             try:
470                 with timer.Timer() as t:
471                     self._headers = {}
472                     response_body = BytesIO()
473                     curl.setopt(pycurl.NOSIGNAL, 1)
474                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
475                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
476                     curl.setopt(pycurl.URL, url.encode('utf-8'))
477                     curl.setopt(pycurl.HTTPHEADER, [
478                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
479                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
480                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
481                     if self.insecure:
482                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
483                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
484                     else:
485                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
486                     if method == "HEAD":
487                         curl.setopt(pycurl.NOBODY, True)
488                     else:
489                         curl.setopt(pycurl.HTTPGET, True)
490                     self._setcurltimeouts(curl, timeout, method=="HEAD")
491
492                     try:
493                         curl.perform()
494                     except Exception as e:
495                         raise arvados.errors.HttpError(0, str(e))
496                     finally:
497                         if self._socket:
498                             self._socket.close()
499                             self._socket = None
500                     self._result = {
501                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
502                         'body': response_body.getvalue(),
503                         'headers': self._headers,
504                         'error': False,
505                     }
506
507                 ok = retry.check_http_response_success(self._result['status_code'])
508                 if not ok:
509                     self._result['error'] = arvados.errors.HttpError(
510                         self._result['status_code'],
511                         self._headers.get('x-status-line', 'Error'))
512             except self.HTTP_ERRORS as e:
513                 self._result = {
514                     'error': e,
515                 }
516             self._usable = ok != False
517             if self._result.get('status_code', None):
518                 # The client worked well enough to get an HTTP status
519                 # code, so presumably any problems are just on the
520                 # server side and it's OK to reuse the client.
521                 self._put_user_agent(curl)
522             else:
523                 # Don't return this client to the pool, in case it's
524                 # broken.
525                 curl.close()
526             if not ok:
527                 _logger.debug("Request fail: GET %s => %s: %s",
528                               url, type(self._result['error']), str(self._result['error']))
529                 return None
530             if method == "HEAD":
531                 _logger.info("HEAD %s: %s bytes",
532                          self._result['status_code'],
533                          self._result.get('content-length'))
534                 if self._result['headers'].get('x-keep-locator'):
535                     # This is a response to a remote block copy request, return
536                     # the local copy block locator.
537                     return self._result['headers'].get('x-keep-locator')
538                 return True
539
540             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
541                          self._result['status_code'],
542                          len(self._result['body']),
543                          t.msecs,
544                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
545
546             if self.download_counter:
547                 self.download_counter.add(len(self._result['body']))
548             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
549             if resp_md5 != locator.md5sum:
550                 _logger.warning("Checksum fail: md5(%s) = %s",
551                                 url, resp_md5)
552                 self._result['error'] = arvados.errors.HttpError(
553                     0, 'Checksum fail')
554                 return None
555             return self._result['body']
556
557         def put(self, hash_s, body, timeout=None, headers={}):
558             put_headers = copy.copy(self.put_headers)
559             put_headers.update(headers)
560             url = self.root + hash_s
561             _logger.debug("Request: PUT %s", url)
562             curl = self._get_user_agent()
563             ok = None
564             try:
565                 with timer.Timer() as t:
566                     self._headers = {}
567                     body_reader = BytesIO(body)
568                     response_body = BytesIO()
569                     curl.setopt(pycurl.NOSIGNAL, 1)
570                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
571                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
572                     curl.setopt(pycurl.URL, url.encode('utf-8'))
573                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
574                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
575                     # response) instead of sending the request body immediately.
576                     # This allows the server to reject the request if the request
577                     # is invalid or the server is read-only, without waiting for
578                     # the client to send the entire block.
579                     curl.setopt(pycurl.UPLOAD, True)
580                     curl.setopt(pycurl.INFILESIZE, len(body))
581                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
582                     curl.setopt(pycurl.HTTPHEADER, [
583                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
584                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
585                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
586                     if self.insecure:
587                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
588                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
589                     else:
590                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
591                     self._setcurltimeouts(curl, timeout)
592                     try:
593                         curl.perform()
594                     except Exception as e:
595                         raise arvados.errors.HttpError(0, str(e))
596                     finally:
597                         if self._socket:
598                             self._socket.close()
599                             self._socket = None
600                     self._result = {
601                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
602                         'body': response_body.getvalue().decode('utf-8'),
603                         'headers': self._headers,
604                         'error': False,
605                     }
606                 ok = retry.check_http_response_success(self._result['status_code'])
607                 if not ok:
608                     self._result['error'] = arvados.errors.HttpError(
609                         self._result['status_code'],
610                         self._headers.get('x-status-line', 'Error'))
611             except self.HTTP_ERRORS as e:
612                 self._result = {
613                     'error': e,
614                 }
615             self._usable = ok != False # still usable if ok is True or None
616             if self._result.get('status_code', None):
617                 # Client is functional. See comment in get().
618                 self._put_user_agent(curl)
619             else:
620                 curl.close()
621             if not ok:
622                 _logger.debug("Request fail: PUT %s => %s: %s",
623                               url, type(self._result['error']), str(self._result['error']))
624                 return False
625             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
626                          self._result['status_code'],
627                          len(body),
628                          t.msecs,
629                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
630             if self.upload_counter:
631                 self.upload_counter.add(len(body))
632             return True
633
634
635     class KeepWriterQueue(queue.Queue):
636         def __init__(self, copies, classes=[]):
637             queue.Queue.__init__(self) # Old-style superclass
638             self.wanted_copies = copies
639             self.wanted_storage_classes = classes
640             self.successful_copies = 0
641             self.confirmed_storage_classes = {}
642             self.response = None
643             self.storage_classes_tracking = True
644             self.queue_data_lock = threading.RLock()
645             self.pending_tries = max(copies, len(classes))
646             self.pending_tries_notification = threading.Condition()
647
648         def write_success(self, response, replicas_nr, classes_confirmed):
649             with self.queue_data_lock:
650                 self.successful_copies += replicas_nr
651                 if classes_confirmed is None:
652                     self.storage_classes_tracking = False
653                 elif self.storage_classes_tracking:
654                     for st_class, st_copies in classes_confirmed.items():
655                         try:
656                             self.confirmed_storage_classes[st_class] += st_copies
657                         except KeyError:
658                             self.confirmed_storage_classes[st_class] = st_copies
659                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
660                 self.response = response
661             with self.pending_tries_notification:
662                 self.pending_tries_notification.notify_all()
663
664         def write_fail(self, ks):
665             with self.pending_tries_notification:
666                 self.pending_tries += 1
667                 self.pending_tries_notification.notify()
668
669         def pending_copies(self):
670             with self.queue_data_lock:
671                 return self.wanted_copies - self.successful_copies
672
673         def satisfied_classes(self):
674             with self.queue_data_lock:
675                 if not self.storage_classes_tracking:
676                     # Notifies disabled storage classes expectation to
677                     # the outer loop.
678                     return None
679             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
680
681         def pending_classes(self):
682             with self.queue_data_lock:
683                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
684                     return []
685                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
686                 for st_class, st_copies in self.confirmed_storage_classes.items():
687                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
688                         unsatisfied_classes.remove(st_class)
689                 return unsatisfied_classes
690
691         def get_next_task(self):
692             with self.pending_tries_notification:
693                 while True:
694                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
695                         # This notify_all() is unnecessary --
696                         # write_success() already called notify_all()
697                         # when pending<1 became true, so it's not
698                         # possible for any other thread to be in
699                         # wait() now -- but it's cheap insurance
700                         # against deadlock so we do it anyway:
701                         self.pending_tries_notification.notify_all()
702                         # Drain the queue and then raise Queue.Empty
703                         while True:
704                             self.get_nowait()
705                             self.task_done()
706                     elif self.pending_tries > 0:
707                         service, service_root = self.get_nowait()
708                         if service.finished():
709                             self.task_done()
710                             continue
711                         self.pending_tries -= 1
712                         return service, service_root
713                     elif self.empty():
714                         self.pending_tries_notification.notify_all()
715                         raise queue.Empty
716                     else:
717                         self.pending_tries_notification.wait()
718
719
720     class KeepWriterThreadPool(object):
721         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
722             self.total_task_nr = 0
723             if (not max_service_replicas) or (max_service_replicas >= copies):
724                 num_threads = 1
725             else:
726                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
727             _logger.debug("Pool max threads is %d", num_threads)
728             self.workers = []
729             self.queue = KeepClient.KeepWriterQueue(copies, classes)
730             # Create workers
731             for _ in range(num_threads):
732                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
733                 self.workers.append(w)
734
735         def add_task(self, ks, service_root):
736             self.queue.put((ks, service_root))
737             self.total_task_nr += 1
738
739         def done(self):
740             return self.queue.successful_copies, self.queue.satisfied_classes()
741
742         def join(self):
743             # Start workers
744             for worker in self.workers:
745                 worker.start()
746             # Wait for finished work
747             self.queue.join()
748
749         def response(self):
750             return self.queue.response
751
752
753     class KeepWriterThread(threading.Thread):
754         class TaskFailed(RuntimeError): pass
755
756         def __init__(self, queue, data, data_hash, timeout=None):
757             super(KeepClient.KeepWriterThread, self).__init__()
758             self.timeout = timeout
759             self.queue = queue
760             self.data = data
761             self.data_hash = data_hash
762             self.daemon = True
763
764         def run(self):
765             while True:
766                 try:
767                     service, service_root = self.queue.get_next_task()
768                 except queue.Empty:
769                     return
770                 try:
771                     locator, copies, classes = self.do_task(service, service_root)
772                 except Exception as e:
773                     if not isinstance(e, self.TaskFailed):
774                         _logger.exception("Exception in KeepWriterThread")
775                     self.queue.write_fail(service)
776                 else:
777                     self.queue.write_success(locator, copies, classes)
778                 finally:
779                     self.queue.task_done()
780
781         def do_task(self, service, service_root):
782             classes = self.queue.pending_classes()
783             headers = {}
784             if len(classes) > 0:
785                 classes.sort()
786                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
787             success = bool(service.put(self.data_hash,
788                                         self.data,
789                                         timeout=self.timeout,
790                                         headers=headers))
791             result = service.last_result()
792
793             if not success:
794                 if result.get('status_code'):
795                     _logger.debug("Request fail: PUT %s => %s %s",
796                                   self.data_hash,
797                                   result.get('status_code'),
798                                   result.get('body'))
799                 raise self.TaskFailed()
800
801             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
802                           str(threading.current_thread()),
803                           self.data_hash,
804                           len(self.data),
805                           service_root)
806             try:
807                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
808             except (KeyError, ValueError):
809                 replicas_stored = 1
810
811             classes_confirmed = {}
812             try:
813                 scch = result['headers']['x-keep-storage-classes-confirmed']
814                 for confirmation in scch.replace(' ', '').split(','):
815                     if '=' in confirmation:
816                         stored_class, stored_copies = confirmation.split('=')[:2]
817                         classes_confirmed[stored_class] = int(stored_copies)
818             except (KeyError, ValueError):
819                 # Storage classes confirmed header missing or corrupt
820                 classes_confirmed = None
821
822             return result['body'].strip(), replicas_stored, classes_confirmed
823
824
825     def __init__(self, api_client=None, proxy=None,
826                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
827                  api_token=None, local_store=None, block_cache=None,
828                  num_retries=10, session=None, num_prefetch_threads=None):
829         """Initialize a new KeepClient.
830
831         Arguments:
832         :api_client:
833           The API client to use to find Keep services.  If not
834           provided, KeepClient will build one from available Arvados
835           configuration.
836
837         :proxy:
838           If specified, this KeepClient will send requests to this Keep
839           proxy.  Otherwise, KeepClient will fall back to the setting of the
840           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
841           If you want to KeepClient does not use a proxy, pass in an empty
842           string.
843
844         :timeout:
845           The initial timeout (in seconds) for HTTP requests to Keep
846           non-proxy servers.  A tuple of three floats is interpreted as
847           (connection_timeout, read_timeout, minimum_bandwidth). A connection
848           will be aborted if the average traffic rate falls below
849           minimum_bandwidth bytes per second over an interval of read_timeout
850           seconds. Because timeouts are often a result of transient server
851           load, the actual connection timeout will be increased by a factor
852           of two on each retry.
853           Default: (2, 256, 32768).
854
855         :proxy_timeout:
856           The initial timeout (in seconds) for HTTP requests to
857           Keep proxies. A tuple of three floats is interpreted as
858           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
859           described above for adjusting connection timeouts on retry also
860           applies.
861           Default: (20, 256, 32768).
862
863         :api_token:
864           If you're not using an API client, but only talking
865           directly to a Keep proxy, this parameter specifies an API token
866           to authenticate Keep requests.  It is an error to specify both
867           api_client and api_token.  If you specify neither, KeepClient
868           will use one available from the Arvados configuration.
869
870         :local_store:
871           If specified, this KeepClient will bypass Keep
872           services, and save data to the named directory.  If unspecified,
873           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
874           environment variable.  If you want to ensure KeepClient does not
875           use local storage, pass in an empty string.  This is primarily
876           intended to mock a server for testing.
877
878         :num_retries:
879           The default number of times to retry failed requests.
880           This will be used as the default num_retries value when get() and
881           put() are called.  Default 10.
882         """
883         self.lock = threading.Lock()
884         if proxy is None:
885             if config.get('ARVADOS_KEEP_SERVICES'):
886                 proxy = config.get('ARVADOS_KEEP_SERVICES')
887             else:
888                 proxy = config.get('ARVADOS_KEEP_PROXY')
889         if api_token is None:
890             if api_client is None:
891                 api_token = config.get('ARVADOS_API_TOKEN')
892             else:
893                 api_token = api_client.api_token
894         elif api_client is not None:
895             raise ValueError(
896                 "can't build KeepClient with both API client and token")
897         if local_store is None:
898             local_store = os.environ.get('KEEP_LOCAL_STORE')
899
900         if api_client is None:
901             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
902         else:
903             self.insecure = api_client.insecure
904
905         self.block_cache = block_cache if block_cache else KeepBlockCache()
906         self.timeout = timeout
907         self.proxy_timeout = proxy_timeout
908         self._user_agent_pool = queue.LifoQueue()
909         self.upload_counter = Counter()
910         self.download_counter = Counter()
911         self.put_counter = Counter()
912         self.get_counter = Counter()
913         self.hits_counter = Counter()
914         self.misses_counter = Counter()
915         self._storage_classes_unsupported_warning = False
916         self._default_classes = []
917         if num_prefetch_threads is not None:
918             self.num_prefetch_threads = num_prefetch_threads
919         else:
920             self.num_prefetch_threads = 2
921         self._prefetch_queue = None
922         self._prefetch_threads = None
923
924         if local_store:
925             self.local_store = local_store
926             self.head = self.local_store_head
927             self.get = self.local_store_get
928             self.put = self.local_store_put
929         else:
930             self.num_retries = num_retries
931             self.max_replicas_per_service = None
932             if proxy:
933                 proxy_uris = proxy.split()
934                 for i in range(len(proxy_uris)):
935                     if not proxy_uris[i].endswith('/'):
936                         proxy_uris[i] += '/'
937                     # URL validation
938                     url = urllib.parse.urlparse(proxy_uris[i])
939                     if not (url.scheme and url.netloc):
940                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
941                 self.api_token = api_token
942                 self._gateway_services = {}
943                 self._keep_services = [{
944                     'uuid': "00000-bi6l4-%015d" % idx,
945                     'service_type': 'proxy',
946                     '_service_root': uri,
947                     } for idx, uri in enumerate(proxy_uris)]
948                 self._writable_services = self._keep_services
949                 self.using_proxy = True
950                 self._static_services_list = True
951             else:
952                 # It's important to avoid instantiating an API client
953                 # unless we actually need one, for testing's sake.
954                 if api_client is None:
955                     api_client = arvados.api('v1')
956                 self.api_client = api_client
957                 self.api_token = api_client.api_token
958                 self._gateway_services = {}
959                 self._keep_services = None
960                 self._writable_services = None
961                 self.using_proxy = None
962                 self._static_services_list = False
963                 try:
964                     self._default_classes = [
965                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
966                 except KeyError:
967                     # We're talking to an old cluster
968                     pass
969
970     def current_timeout(self, attempt_number):
971         """Return the appropriate timeout to use for this client.
972
973         The proxy timeout setting if the backend service is currently a proxy,
974         the regular timeout setting otherwise.  The `attempt_number` indicates
975         how many times the operation has been tried already (starting from 0
976         for the first try), and scales the connection timeout portion of the
977         return value accordingly.
978
979         """
980         # TODO(twp): the timeout should be a property of a
981         # KeepService, not a KeepClient. See #4488.
982         t = self.proxy_timeout if self.using_proxy else self.timeout
983         if len(t) == 2:
984             return (t[0] * (1 << attempt_number), t[1])
985         else:
986             return (t[0] * (1 << attempt_number), t[1], t[2])
987     def _any_nondisk_services(self, service_list):
988         return any(ks.get('service_type', 'disk') != 'disk'
989                    for ks in service_list)
990
991     def build_services_list(self, force_rebuild=False):
992         if (self._static_services_list or
993               (self._keep_services and not force_rebuild)):
994             return
995         with self.lock:
996             try:
997                 keep_services = self.api_client.keep_services().accessible()
998             except Exception:  # API server predates Keep services.
999                 keep_services = self.api_client.keep_disks().list()
1000
1001             # Gateway services are only used when specified by UUID,
1002             # so there's nothing to gain by filtering them by
1003             # service_type.
1004             self._gateway_services = {ks['uuid']: ks for ks in
1005                                       keep_services.execute()['items']}
1006             if not self._gateway_services:
1007                 raise arvados.errors.NoKeepServersError()
1008
1009             # Precompute the base URI for each service.
1010             for r in self._gateway_services.values():
1011                 host = r['service_host']
1012                 if not host.startswith('[') and host.find(':') >= 0:
1013                     # IPv6 URIs must be formatted like http://[::1]:80/...
1014                     host = '[' + host + ']'
1015                 r['_service_root'] = "{}://{}:{:d}/".format(
1016                     'https' if r['service_ssl_flag'] else 'http',
1017                     host,
1018                     r['service_port'])
1019
1020             _logger.debug(str(self._gateway_services))
1021             self._keep_services = [
1022                 ks for ks in self._gateway_services.values()
1023                 if not ks.get('service_type', '').startswith('gateway:')]
1024             self._writable_services = [ks for ks in self._keep_services
1025                                        if not ks.get('read_only')]
1026
1027             # For disk type services, max_replicas_per_service is 1
1028             # It is unknown (unlimited) for other service types.
1029             if self._any_nondisk_services(self._writable_services):
1030                 self.max_replicas_per_service = None
1031             else:
1032                 self.max_replicas_per_service = 1
1033
1034     def _service_weight(self, data_hash, service_uuid):
1035         """Compute the weight of a Keep service endpoint for a data
1036         block with a known hash.
1037
1038         The weight is md5(h + u) where u is the last 15 characters of
1039         the service endpoint's UUID.
1040         """
1041         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1042
1043     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1044         """Return an array of Keep service endpoints, in the order in
1045         which they should be probed when reading or writing data with
1046         the given hash+hints.
1047         """
1048         self.build_services_list(force_rebuild)
1049
1050         sorted_roots = []
1051         # Use the services indicated by the given +K@... remote
1052         # service hints, if any are present and can be resolved to a
1053         # URI.
1054         for hint in locator.hints:
1055             if hint.startswith('K@'):
1056                 if len(hint) == 7:
1057                     sorted_roots.append(
1058                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1059                 elif len(hint) == 29:
1060                     svc = self._gateway_services.get(hint[2:])
1061                     if svc:
1062                         sorted_roots.append(svc['_service_root'])
1063
1064         # Sort the available local services by weight (heaviest first)
1065         # for this locator, and return their service_roots (base URIs)
1066         # in that order.
1067         use_services = self._keep_services
1068         if need_writable:
1069             use_services = self._writable_services
1070         self.using_proxy = self._any_nondisk_services(use_services)
1071         sorted_roots.extend([
1072             svc['_service_root'] for svc in sorted(
1073                 use_services,
1074                 reverse=True,
1075                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1076         _logger.debug("{}: {}".format(locator, sorted_roots))
1077         return sorted_roots
1078
1079     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1080         # roots_map is a dictionary, mapping Keep service root strings
1081         # to KeepService objects.  Poll for Keep services, and add any
1082         # new ones to roots_map.  Return the current list of local
1083         # root strings.
1084         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1085         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1086         for root in local_roots:
1087             if root not in roots_map:
1088                 roots_map[root] = self.KeepService(
1089                     root, self._user_agent_pool,
1090                     upload_counter=self.upload_counter,
1091                     download_counter=self.download_counter,
1092                     headers=headers,
1093                     insecure=self.insecure)
1094         return local_roots
1095
1096     @staticmethod
1097     def _check_loop_result(result):
1098         # KeepClient RetryLoops should save results as a 2-tuple: the
1099         # actual result of the request, and the number of servers available
1100         # to receive the request this round.
1101         # This method returns True if there's a real result, False if
1102         # there are no more servers available, otherwise None.
1103         if isinstance(result, Exception):
1104             return None
1105         result, tried_server_count = result
1106         if (result is not None) and (result is not False):
1107             return True
1108         elif tried_server_count < 1:
1109             _logger.info("No more Keep services to try; giving up")
1110             return False
1111         else:
1112             return None
1113
1114     def get_from_cache(self, loc_s):
1115         """Fetch a block only if is in the cache, otherwise return None."""
1116         locator = KeepLocator(loc_s)
1117         slot = self.block_cache.get(locator.md5sum)
1118         if slot is not None and slot.ready.is_set():
1119             return slot.get()
1120         else:
1121             return None
1122
1123     def refresh_signature(self, loc):
1124         """Ask Keep to get the remote block and return its local signature"""
1125         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1126         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1127
1128     @retry.retry_method
1129     def head(self, loc_s, **kwargs):
1130         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1131
1132     @retry.retry_method
1133     def get(self, loc_s, **kwargs):
1134         return self._get_or_head(loc_s, method="GET", **kwargs)
1135
1136     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1137         """Get data from Keep.
1138
1139         This method fetches one or more blocks of data from Keep.  It
1140         sends a request each Keep service registered with the API
1141         server (or the proxy provided when this client was
1142         instantiated), then each service named in location hints, in
1143         sequence.  As soon as one service provides the data, it's
1144         returned.
1145
1146         Arguments:
1147         * loc_s: A string of one or more comma-separated locators to fetch.
1148           This method returns the concatenation of these blocks.
1149         * num_retries: The number of times to retry GET requests to
1150           *each* Keep server if it returns temporary failures, with
1151           exponential backoff.  Note that, in each loop, the method may try
1152           to fetch data from every available Keep service, along with any
1153           that are named in location hints in the locator.  The default value
1154           is set when the KeepClient is initialized.
1155         """
1156         if ',' in loc_s:
1157             return ''.join(self.get(x) for x in loc_s.split(','))
1158
1159         self.get_counter.add(1)
1160
1161         request_id = (request_id or
1162                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1163                       arvados.util.new_request_id())
1164         if headers is None:
1165             headers = {}
1166         headers['X-Request-Id'] = request_id
1167
1168         slot = None
1169         blob = None
1170         try:
1171             locator = KeepLocator(loc_s)
1172             if method == "GET":
1173                 while slot is None:
1174                     slot, first = self.block_cache.reserve_cache(locator.md5sum)
1175                     if first:
1176                         # Fresh and empty "first time it is used" slot
1177                         break
1178                     if prefetch:
1179                         # this is request for a prefetch to fill in
1180                         # the cache, don't need to wait for the
1181                         # result, so if it is already in flight return
1182                         # immediately.  Clear 'slot' to prevent
1183                         # finally block from calling slot.set()
1184                         if slot.ready.is_set():
1185                             slot.get()
1186                         slot = None
1187                         return None
1188
1189                     blob = slot.get()
1190                     if blob is not None:
1191                         self.hits_counter.add(1)
1192                         return blob
1193
1194                     # If blob is None, this means either
1195                     #
1196                     # (a) another thread was fetching this block and
1197                     # failed with an error or
1198                     #
1199                     # (b) cache thrashing caused the slot to be
1200                     # evicted (content set to None) by another thread
1201                     # between the call to reserve_cache() and get().
1202                     #
1203                     # We'll handle these cases by reserving a new slot
1204                     # and then doing a full GET request.
1205                     slot = None
1206
1207             self.misses_counter.add(1)
1208
1209             # If the locator has hints specifying a prefix (indicating a
1210             # remote keepproxy) or the UUID of a local gateway service,
1211             # read data from the indicated service(s) instead of the usual
1212             # list of local disk services.
1213             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1214                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1215             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1216                                for hint in locator.hints if (
1217                                        hint.startswith('K@') and
1218                                        len(hint) == 29 and
1219                                        self._gateway_services.get(hint[2:])
1220                                        )])
1221             # Map root URLs to their KeepService objects.
1222             roots_map = {
1223                 root: self.KeepService(root, self._user_agent_pool,
1224                                        upload_counter=self.upload_counter,
1225                                        download_counter=self.download_counter,
1226                                        headers=headers,
1227                                        insecure=self.insecure)
1228                 for root in hint_roots
1229             }
1230
1231             # See #3147 for a discussion of the loop implementation.  Highlights:
1232             # * Refresh the list of Keep services after each failure, in case
1233             #   it's being updated.
1234             # * Retry until we succeed, we're out of retries, or every available
1235             #   service has returned permanent failure.
1236             sorted_roots = []
1237             roots_map = {}
1238             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1239                                    backoff_start=2)
1240             for tries_left in loop:
1241                 try:
1242                     sorted_roots = self.map_new_services(
1243                         roots_map, locator,
1244                         force_rebuild=(tries_left < num_retries),
1245                         need_writable=False,
1246                         headers=headers)
1247                 except Exception as error:
1248                     loop.save_result(error)
1249                     continue
1250
1251                 # Query KeepService objects that haven't returned
1252                 # permanent failure, in our specified shuffle order.
1253                 services_to_try = [roots_map[root]
1254                                    for root in sorted_roots
1255                                    if roots_map[root].usable()]
1256                 for keep_service in services_to_try:
1257                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1258                     if blob is not None:
1259                         break
1260                 loop.save_result((blob, len(services_to_try)))
1261
1262             # Always cache the result, then return it if we succeeded.
1263             if loop.success():
1264                 return blob
1265         finally:
1266             if slot is not None:
1267                 self.block_cache.set(slot, blob)
1268
1269         # Q: Including 403 is necessary for the Keep tests to continue
1270         # passing, but maybe they should expect KeepReadError instead?
1271         not_founds = sum(1 for key in sorted_roots
1272                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1273         service_errors = ((key, roots_map[key].last_result()['error'])
1274                           for key in sorted_roots)
1275         if not roots_map:
1276             raise arvados.errors.KeepReadError(
1277                 "[{}] failed to read {}: no Keep services available ({})".format(
1278                     request_id, loc_s, loop.last_result()))
1279         elif not_founds == len(sorted_roots):
1280             raise arvados.errors.NotFoundError(
1281                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1282         else:
1283             raise arvados.errors.KeepReadError(
1284                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1285
1286     @retry.retry_method
1287     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1288         """Save data in Keep.
1289
1290         This method will get a list of Keep services from the API server, and
1291         send the data to each one simultaneously in a new thread.  Once the
1292         uploads are finished, if enough copies are saved, this method returns
1293         the most recent HTTP response body.  If requests fail to upload
1294         enough copies, this method raises KeepWriteError.
1295
1296         Arguments:
1297         * data: The string of data to upload.
1298         * copies: The number of copies that the user requires be saved.
1299           Default 2.
1300         * num_retries: The number of times to retry PUT requests to
1301           *each* Keep server if it returns temporary failures, with
1302           exponential backoff.  The default value is set when the
1303           KeepClient is initialized.
1304         * classes: An optional list of storage class names where copies should
1305           be written.
1306         """
1307
1308         classes = classes or self._default_classes
1309
1310         if not isinstance(data, bytes):
1311             data = data.encode()
1312
1313         self.put_counter.add(1)
1314
1315         data_hash = hashlib.md5(data).hexdigest()
1316         loc_s = data_hash + '+' + str(len(data))
1317         if copies < 1:
1318             return loc_s
1319         locator = KeepLocator(loc_s)
1320
1321         request_id = (request_id or
1322                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1323                       arvados.util.new_request_id())
1324         headers = {
1325             'X-Request-Id': request_id,
1326             'X-Keep-Desired-Replicas': str(copies),
1327         }
1328         roots_map = {}
1329         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1330                                backoff_start=2)
1331         done_copies = 0
1332         done_classes = []
1333         for tries_left in loop:
1334             try:
1335                 sorted_roots = self.map_new_services(
1336                     roots_map, locator,
1337                     force_rebuild=(tries_left < num_retries),
1338                     need_writable=True,
1339                     headers=headers)
1340             except Exception as error:
1341                 loop.save_result(error)
1342                 continue
1343
1344             pending_classes = []
1345             if done_classes is not None:
1346                 pending_classes = list(set(classes) - set(done_classes))
1347             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1348                                                         data_hash=data_hash,
1349                                                         copies=copies - done_copies,
1350                                                         max_service_replicas=self.max_replicas_per_service,
1351                                                         timeout=self.current_timeout(num_retries - tries_left),
1352                                                         classes=pending_classes)
1353             for service_root, ks in [(root, roots_map[root])
1354                                      for root in sorted_roots]:
1355                 if ks.finished():
1356                     continue
1357                 writer_pool.add_task(ks, service_root)
1358             writer_pool.join()
1359             pool_copies, pool_classes = writer_pool.done()
1360             done_copies += pool_copies
1361             if (done_classes is not None) and (pool_classes is not None):
1362                 done_classes += pool_classes
1363                 loop.save_result(
1364                     (done_copies >= copies and set(done_classes) == set(classes),
1365                     writer_pool.total_task_nr))
1366             else:
1367                 # Old keepstore contacted without storage classes support:
1368                 # success is determined only by successful copies.
1369                 #
1370                 # Disable storage classes tracking from this point forward.
1371                 if not self._storage_classes_unsupported_warning:
1372                     self._storage_classes_unsupported_warning = True
1373                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1374                 done_classes = None
1375                 loop.save_result(
1376                     (done_copies >= copies, writer_pool.total_task_nr))
1377
1378         if loop.success():
1379             return writer_pool.response()
1380         if not roots_map:
1381             raise arvados.errors.KeepWriteError(
1382                 "[{}] failed to write {}: no Keep services available ({})".format(
1383                     request_id, data_hash, loop.last_result()))
1384         else:
1385             service_errors = ((key, roots_map[key].last_result()['error'])
1386                               for key in sorted_roots
1387                               if roots_map[key].last_result()['error'])
1388             raise arvados.errors.KeepWriteError(
1389                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1390                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1391
1392     def _block_prefetch_worker(self):
1393         """The background downloader thread."""
1394         while True:
1395             try:
1396                 b = self._prefetch_queue.get()
1397                 if b is None:
1398                     return
1399                 self.get(b, prefetch=True)
1400             except Exception:
1401                 _logger.exception("Exception doing block prefetch")
1402
1403     def _start_prefetch_threads(self):
1404         if self._prefetch_threads is None:
1405             with self.lock:
1406                 if self._prefetch_threads is not None:
1407                     return
1408                 self._prefetch_queue = queue.Queue()
1409                 self._prefetch_threads = []
1410                 for i in range(0, self.num_prefetch_threads):
1411                     thread = threading.Thread(target=self._block_prefetch_worker)
1412                     self._prefetch_threads.append(thread)
1413                     thread.daemon = True
1414                     thread.start()
1415
1416     def block_prefetch(self, locator):
1417         """
1418         This relies on the fact that KeepClient implements a block cache,
1419         so repeated requests for the same block will not result in repeated
1420         downloads (unless the block is evicted from the cache.)  This method
1421         does not block.
1422         """
1423
1424         if self.block_cache.get(locator) is not None:
1425             return
1426
1427         self._start_prefetch_threads()
1428         self._prefetch_queue.put(locator)
1429
1430     def stop_prefetch_threads(self):
1431         with self.lock:
1432             if self._prefetch_threads is not None:
1433                 for t in self._prefetch_threads:
1434                     self._prefetch_queue.put(None)
1435                 for t in self._prefetch_threads:
1436                     t.join()
1437             self._prefetch_threads = None
1438             self._prefetch_queue = None
1439
1440     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1441         """A stub for put().
1442
1443         This method is used in place of the real put() method when
1444         using local storage (see constructor's local_store argument).
1445
1446         copies and num_retries arguments are ignored: they are here
1447         only for the sake of offering the same call signature as
1448         put().
1449
1450         Data stored this way can be retrieved via local_store_get().
1451         """
1452         md5 = hashlib.md5(data).hexdigest()
1453         locator = '%s+%d' % (md5, len(data))
1454         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1455             f.write(data)
1456         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1457                   os.path.join(self.local_store, md5))
1458         return locator
1459
1460     def local_store_get(self, loc_s, num_retries=None):
1461         """Companion to local_store_put()."""
1462         try:
1463             locator = KeepLocator(loc_s)
1464         except ValueError:
1465             raise arvados.errors.NotFoundError(
1466                 "Invalid data locator: '%s'" % loc_s)
1467         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1468             return b''
1469         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1470             return f.read()
1471
1472     def local_store_head(self, loc_s, num_retries=None):
1473         """Companion to local_store_put()."""
1474         try:
1475             locator = KeepLocator(loc_s)
1476         except ValueError:
1477             raise arvados.errors.NotFoundError(
1478                 "Invalid data locator: '%s'" % loc_s)
1479         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1480             return True
1481         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1482             return True