Merge branch '21678-installer-diagnostics-internal'. Closes #21678
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import copy
6 import collections
7 import datetime
8 import hashlib
9 import errno
10 import io
11 import logging
12 import math
13 import os
14 import pycurl
15 import queue
16 import re
17 import socket
18 import ssl
19 import sys
20 import threading
21 import resource
22 import urllib.parse
23 import traceback
24 import weakref
25
26 from io import BytesIO
27
28 import arvados
29 import arvados.config as config
30 import arvados.errors
31 import arvados.retry as retry
32 import arvados.util
33 import arvados.diskcache
34 from arvados._pycurlhelper import PyCurlHelper
35 from . import timer
36
37 _logger = logging.getLogger('arvados.keep')
38 global_client_object = None
39
40 # Monkey patch TCP constants when not available (apple). Values sourced from:
41 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
42 if sys.platform == 'darwin':
43     if not hasattr(socket, 'TCP_KEEPALIVE'):
44         socket.TCP_KEEPALIVE = 0x010
45     if not hasattr(socket, 'TCP_KEEPINTVL'):
46         socket.TCP_KEEPINTVL = 0x101
47     if not hasattr(socket, 'TCP_KEEPCNT'):
48         socket.TCP_KEEPCNT = 0x102
49
50 class KeepLocator(object):
51     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
52     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
53
54     def __init__(self, locator_str):
55         self.hints = []
56         self._perm_sig = None
57         self._perm_expiry = None
58         pieces = iter(locator_str.split('+'))
59         self.md5sum = next(pieces)
60         try:
61             self.size = int(next(pieces))
62         except StopIteration:
63             self.size = None
64         for hint in pieces:
65             if self.HINT_RE.match(hint) is None:
66                 raise ValueError("invalid hint format: {}".format(hint))
67             elif hint.startswith('A'):
68                 self.parse_permission_hint(hint)
69             else:
70                 self.hints.append(hint)
71
72     def __str__(self):
73         return '+'.join(
74             str(s)
75             for s in [self.md5sum, self.size,
76                       self.permission_hint()] + self.hints
77             if s is not None)
78
79     def stripped(self):
80         if self.size is not None:
81             return "%s+%i" % (self.md5sum, self.size)
82         else:
83             return self.md5sum
84
85     def _make_hex_prop(name, length):
86         # Build and return a new property with the given name that
87         # must be a hex string of the given length.
88         data_name = '_{}'.format(name)
89         def getter(self):
90             return getattr(self, data_name)
91         def setter(self, hex_str):
92             if not arvados.util.is_hex(hex_str, length):
93                 raise ValueError("{} is not a {}-digit hex string: {!r}".
94                                  format(name, length, hex_str))
95             setattr(self, data_name, hex_str)
96         return property(getter, setter)
97
98     md5sum = _make_hex_prop('md5sum', 32)
99     perm_sig = _make_hex_prop('perm_sig', 40)
100
101     @property
102     def perm_expiry(self):
103         return self._perm_expiry
104
105     @perm_expiry.setter
106     def perm_expiry(self, value):
107         if not arvados.util.is_hex(value, 1, 8):
108             raise ValueError(
109                 "permission timestamp must be a hex Unix timestamp: {}".
110                 format(value))
111         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
112
113     def permission_hint(self):
114         data = [self.perm_sig, self.perm_expiry]
115         if None in data:
116             return None
117         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
118         return "A{}@{:08x}".format(*data)
119
120     def parse_permission_hint(self, s):
121         try:
122             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
123         except IndexError:
124             raise ValueError("bad permission hint {}".format(s))
125
126     def permission_expired(self, as_of_dt=None):
127         if self.perm_expiry is None:
128             return False
129         elif as_of_dt is None:
130             as_of_dt = datetime.datetime.now()
131         return self.perm_expiry <= as_of_dt
132
133
134 class Keep(object):
135     """Simple interface to a global KeepClient object.
136
137     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
138     own API client.  The global KeepClient will build an API client from the
139     current Arvados configuration, which may not match the one you built.
140     """
141     _last_key = None
142
143     @classmethod
144     def global_client_object(cls):
145         global global_client_object
146         # Previously, KeepClient would change its behavior at runtime based
147         # on these configuration settings.  We simulate that behavior here
148         # by checking the values and returning a new KeepClient if any of
149         # them have changed.
150         key = (config.get('ARVADOS_API_HOST'),
151                config.get('ARVADOS_API_TOKEN'),
152                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
153                config.get('ARVADOS_KEEP_PROXY'),
154                os.environ.get('KEEP_LOCAL_STORE'))
155         if (global_client_object is None) or (cls._last_key != key):
156             global_client_object = KeepClient()
157             cls._last_key = key
158         return global_client_object
159
160     @staticmethod
161     def get(locator, **kwargs):
162         return Keep.global_client_object().get(locator, **kwargs)
163
164     @staticmethod
165     def put(data, **kwargs):
166         return Keep.global_client_object().put(data, **kwargs)
167
168 class KeepBlockCache(object):
169     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
170         self.cache_max = cache_max
171         self._cache = collections.OrderedDict()
172         self._cache_lock = threading.Lock()
173         self._max_slots = max_slots
174         self._disk_cache = disk_cache
175         self._disk_cache_dir = disk_cache_dir
176         self._cache_updating = threading.Condition(self._cache_lock)
177
178         if self._disk_cache and self._disk_cache_dir is None:
179             self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
180             os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
181
182         if self._max_slots == 0:
183             if self._disk_cache:
184                 # Each block uses two file descriptors, one used to
185                 # open it initially and hold the flock(), and a second
186                 # hidden one used by mmap().
187                 #
188                 # Set max slots to 1/8 of maximum file handles.  This
189                 # means we'll use at most 1/4 of total file handles.
190                 #
191                 # NOFILE typically defaults to 1024 on Linux so this
192                 # is 128 slots (256 file handles), which means we can
193                 # cache up to 8 GiB of 64 MiB blocks.  This leaves
194                 # 768 file handles for sockets and other stuff.
195                 #
196                 # When we want the ability to have more cache (e.g. in
197                 # arv-mount) we'll increase rlimit before calling
198                 # this.
199                 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
200             else:
201                 # RAM cache slots
202                 self._max_slots = 512
203
204         if self.cache_max == 0:
205             if self._disk_cache:
206                 fs = os.statvfs(self._disk_cache_dir)
207                 # Calculation of available space incorporates existing cache usage
208                 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
209                 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
210                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
211                 # pick smallest of:
212                 # 10% of total disk size
213                 # 25% of available space
214                 # max_slots * 64 MiB
215                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
216             else:
217                 # 256 MiB in RAM
218                 self.cache_max = (256 * 1024 * 1024)
219
220         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
221
222         self.cache_total = 0
223         if self._disk_cache:
224             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
225             for slot in self._cache.values():
226                 self.cache_total += slot.size()
227             self.cap_cache()
228
229     class CacheSlot(object):
230         __slots__ = ("locator", "ready", "content")
231
232         def __init__(self, locator):
233             self.locator = locator
234             self.ready = threading.Event()
235             self.content = None
236
237         def get(self):
238             self.ready.wait()
239             return self.content
240
241         def set(self, value):
242             if self.content is not None:
243                 return False
244             self.content = value
245             self.ready.set()
246             return True
247
248         def size(self):
249             if self.content is None:
250                 return 0
251             else:
252                 return len(self.content)
253
254         def evict(self):
255             self.content = None
256
257
258     def _resize_cache(self, cache_max, max_slots):
259         # Try and make sure the contents of the cache do not exceed
260         # the supplied maximums.
261
262         if self.cache_total <= cache_max and len(self._cache) <= max_slots:
263             return
264
265         _evict_candidates = collections.deque(self._cache.values())
266         while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
267             slot = _evict_candidates.popleft()
268             if not slot.ready.is_set():
269                 continue
270
271             sz = slot.size()
272             slot.evict()
273             self.cache_total -= sz
274             del self._cache[slot.locator]
275
276
277     def cap_cache(self):
278         '''Cap the cache size to self.cache_max'''
279         with self._cache_updating:
280             self._resize_cache(self.cache_max, self._max_slots)
281             self._cache_updating.notify_all()
282
283     def _get(self, locator):
284         # Test if the locator is already in the cache
285         if locator in self._cache:
286             n = self._cache[locator]
287             if n.ready.is_set() and n.content is None:
288                 del self._cache[n.locator]
289                 return None
290             self._cache.move_to_end(locator)
291             return n
292         if self._disk_cache:
293             # see if it exists on disk
294             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
295             if n is not None:
296                 self._cache[n.locator] = n
297                 self.cache_total += n.size()
298                 return n
299         return None
300
301     def get(self, locator):
302         with self._cache_lock:
303             return self._get(locator)
304
305     def reserve_cache(self, locator):
306         '''Reserve a cache slot for the specified locator,
307         or return the existing slot.'''
308         with self._cache_updating:
309             n = self._get(locator)
310             if n:
311                 return n, False
312             else:
313                 # Add a new cache slot for the locator
314                 self._resize_cache(self.cache_max, self._max_slots-1)
315                 while len(self._cache) >= self._max_slots:
316                     # If there isn't a slot available, need to wait
317                     # for something to happen that releases one of the
318                     # cache slots.  Idle for 200 ms or woken up by
319                     # another thread
320                     self._cache_updating.wait(timeout=0.2)
321                     self._resize_cache(self.cache_max, self._max_slots-1)
322
323                 if self._disk_cache:
324                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
325                 else:
326                     n = KeepBlockCache.CacheSlot(locator)
327                 self._cache[n.locator] = n
328                 return n, True
329
330     def set(self, slot, blob):
331         try:
332             if slot.set(blob):
333                 self.cache_total += slot.size()
334             return
335         except OSError as e:
336             if e.errno == errno.ENOMEM:
337                 # Reduce max slots to current - 4, cap cache and retry
338                 with self._cache_lock:
339                     self._max_slots = max(4, len(self._cache) - 4)
340             elif e.errno == errno.ENOSPC:
341                 # Reduce disk max space to current - 256 MiB, cap cache and retry
342                 with self._cache_lock:
343                     sm = sum(st.size() for st in self._cache.values())
344                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
345             elif e.errno == errno.ENODEV:
346                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
347         except Exception as e:
348             pass
349         finally:
350             # Check if we should evict things from the cache.  Either
351             # because we added a new thing or there was an error and
352             # we possibly adjusted the limits down, so we might need
353             # to push something out.
354             self.cap_cache()
355
356         try:
357             # Only gets here if there was an error the first time. The
358             # exception handler adjusts limits downward in some cases
359             # to free up resources, which would make the operation
360             # succeed.
361             if slot.set(blob):
362                 self.cache_total += slot.size()
363         except Exception as e:
364             # It failed again.  Give up.
365             slot.set(None)
366             raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
367
368         self.cap_cache()
369
370 class Counter(object):
371     def __init__(self, v=0):
372         self._lk = threading.Lock()
373         self._val = v
374
375     def add(self, v):
376         with self._lk:
377             self._val += v
378
379     def get(self):
380         with self._lk:
381             return self._val
382
383
384 class KeepClient(object):
385     DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
386     DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
387
388     class KeepService(PyCurlHelper):
389         """Make requests to a single Keep service, and track results.
390
391         A KeepService is intended to last long enough to perform one
392         transaction (GET or PUT) against one Keep service. This can
393         involve calling either get() or put() multiple times in order
394         to retry after transient failures. However, calling both get()
395         and put() on a single instance -- or using the same instance
396         to access two different Keep services -- will not produce
397         sensible behavior.
398         """
399
400         HTTP_ERRORS = (
401             socket.error,
402             ssl.SSLError,
403             arvados.errors.HttpError,
404         )
405
406         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
407                      upload_counter=None,
408                      download_counter=None,
409                      headers={},
410                      insecure=False):
411             super(KeepClient.KeepService, self).__init__()
412             self.root = root
413             self._user_agent_pool = user_agent_pool
414             self._result = {'error': None}
415             self._usable = True
416             self._session = None
417             self._socket = None
418             self.get_headers = {'Accept': 'application/octet-stream'}
419             self.get_headers.update(headers)
420             self.put_headers = headers
421             self.upload_counter = upload_counter
422             self.download_counter = download_counter
423             self.insecure = insecure
424
425         def usable(self):
426             """Is it worth attempting a request?"""
427             return self._usable
428
429         def finished(self):
430             """Did the request succeed or encounter permanent failure?"""
431             return self._result['error'] == False or not self._usable
432
433         def last_result(self):
434             return self._result
435
436         def _get_user_agent(self):
437             try:
438                 return self._user_agent_pool.get(block=False)
439             except queue.Empty:
440                 return pycurl.Curl()
441
442         def _put_user_agent(self, ua):
443             try:
444                 ua.reset()
445                 self._user_agent_pool.put(ua, block=False)
446             except:
447                 ua.close()
448
449         def get(self, locator, method="GET", timeout=None):
450             # locator is a KeepLocator object.
451             url = self.root + str(locator)
452             _logger.debug("Request: %s %s", method, url)
453             curl = self._get_user_agent()
454             ok = None
455             try:
456                 with timer.Timer() as t:
457                     self._headers = {}
458                     response_body = BytesIO()
459                     curl.setopt(pycurl.NOSIGNAL, 1)
460                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
461                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
462                     curl.setopt(pycurl.URL, url.encode('utf-8'))
463                     curl.setopt(pycurl.HTTPHEADER, [
464                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
465                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
466                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
467                     if self.insecure:
468                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
469                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
470                     else:
471                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
472                     if method == "HEAD":
473                         curl.setopt(pycurl.NOBODY, True)
474                     else:
475                         curl.setopt(pycurl.HTTPGET, True)
476                     self._setcurltimeouts(curl, timeout, method=="HEAD")
477
478                     try:
479                         curl.perform()
480                     except Exception as e:
481                         raise arvados.errors.HttpError(0, str(e))
482                     finally:
483                         if self._socket:
484                             self._socket.close()
485                             self._socket = None
486                     self._result = {
487                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
488                         'body': response_body.getvalue(),
489                         'headers': self._headers,
490                         'error': False,
491                     }
492
493                 ok = retry.check_http_response_success(self._result['status_code'])
494                 if not ok:
495                     self._result['error'] = arvados.errors.HttpError(
496                         self._result['status_code'],
497                         self._headers.get('x-status-line', 'Error'))
498             except self.HTTP_ERRORS as e:
499                 self._result = {
500                     'error': e,
501                 }
502             self._usable = ok != False
503             if self._result.get('status_code', None):
504                 # The client worked well enough to get an HTTP status
505                 # code, so presumably any problems are just on the
506                 # server side and it's OK to reuse the client.
507                 self._put_user_agent(curl)
508             else:
509                 # Don't return this client to the pool, in case it's
510                 # broken.
511                 curl.close()
512             if not ok:
513                 _logger.debug("Request fail: GET %s => %s: %s",
514                               url, type(self._result['error']), str(self._result['error']))
515                 return None
516             if method == "HEAD":
517                 _logger.info("HEAD %s: %s bytes",
518                          self._result['status_code'],
519                          self._result.get('content-length'))
520                 if self._result['headers'].get('x-keep-locator'):
521                     # This is a response to a remote block copy request, return
522                     # the local copy block locator.
523                     return self._result['headers'].get('x-keep-locator')
524                 return True
525
526             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
527                          self._result['status_code'],
528                          len(self._result['body']),
529                          t.msecs,
530                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
531
532             if self.download_counter:
533                 self.download_counter.add(len(self._result['body']))
534             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
535             if resp_md5 != locator.md5sum:
536                 _logger.warning("Checksum fail: md5(%s) = %s",
537                                 url, resp_md5)
538                 self._result['error'] = arvados.errors.HttpError(
539                     0, 'Checksum fail')
540                 return None
541             return self._result['body']
542
543         def put(self, hash_s, body, timeout=None, headers={}):
544             put_headers = copy.copy(self.put_headers)
545             put_headers.update(headers)
546             url = self.root + hash_s
547             _logger.debug("Request: PUT %s", url)
548             curl = self._get_user_agent()
549             ok = None
550             try:
551                 with timer.Timer() as t:
552                     self._headers = {}
553                     body_reader = BytesIO(body)
554                     response_body = BytesIO()
555                     curl.setopt(pycurl.NOSIGNAL, 1)
556                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
557                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
558                     curl.setopt(pycurl.URL, url.encode('utf-8'))
559                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
560                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
561                     # response) instead of sending the request body immediately.
562                     # This allows the server to reject the request if the request
563                     # is invalid or the server is read-only, without waiting for
564                     # the client to send the entire block.
565                     curl.setopt(pycurl.UPLOAD, True)
566                     curl.setopt(pycurl.INFILESIZE, len(body))
567                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
568                     curl.setopt(pycurl.HTTPHEADER, [
569                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
570                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
571                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
572                     if self.insecure:
573                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
574                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
575                     else:
576                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
577                     self._setcurltimeouts(curl, timeout)
578                     try:
579                         curl.perform()
580                     except Exception as e:
581                         raise arvados.errors.HttpError(0, str(e))
582                     finally:
583                         if self._socket:
584                             self._socket.close()
585                             self._socket = None
586                     self._result = {
587                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
588                         'body': response_body.getvalue().decode('utf-8'),
589                         'headers': self._headers,
590                         'error': False,
591                     }
592                 ok = retry.check_http_response_success(self._result['status_code'])
593                 if not ok:
594                     self._result['error'] = arvados.errors.HttpError(
595                         self._result['status_code'],
596                         self._headers.get('x-status-line', 'Error'))
597             except self.HTTP_ERRORS as e:
598                 self._result = {
599                     'error': e,
600                 }
601             self._usable = ok != False # still usable if ok is True or None
602             if self._result.get('status_code', None):
603                 # Client is functional. See comment in get().
604                 self._put_user_agent(curl)
605             else:
606                 curl.close()
607             if not ok:
608                 _logger.debug("Request fail: PUT %s => %s: %s",
609                               url, type(self._result['error']), str(self._result['error']))
610                 return False
611             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
612                          self._result['status_code'],
613                          len(body),
614                          t.msecs,
615                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
616             if self.upload_counter:
617                 self.upload_counter.add(len(body))
618             return True
619
620
621     class KeepWriterQueue(queue.Queue):
622         def __init__(self, copies, classes=[]):
623             queue.Queue.__init__(self) # Old-style superclass
624             self.wanted_copies = copies
625             self.wanted_storage_classes = classes
626             self.successful_copies = 0
627             self.confirmed_storage_classes = {}
628             self.response = None
629             self.storage_classes_tracking = True
630             self.queue_data_lock = threading.RLock()
631             self.pending_tries = max(copies, len(classes))
632             self.pending_tries_notification = threading.Condition()
633
634         def write_success(self, response, replicas_nr, classes_confirmed):
635             with self.queue_data_lock:
636                 self.successful_copies += replicas_nr
637                 if classes_confirmed is None:
638                     self.storage_classes_tracking = False
639                 elif self.storage_classes_tracking:
640                     for st_class, st_copies in classes_confirmed.items():
641                         try:
642                             self.confirmed_storage_classes[st_class] += st_copies
643                         except KeyError:
644                             self.confirmed_storage_classes[st_class] = st_copies
645                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
646                 self.response = response
647             with self.pending_tries_notification:
648                 self.pending_tries_notification.notify_all()
649
650         def write_fail(self, ks):
651             with self.pending_tries_notification:
652                 self.pending_tries += 1
653                 self.pending_tries_notification.notify()
654
655         def pending_copies(self):
656             with self.queue_data_lock:
657                 return self.wanted_copies - self.successful_copies
658
659         def satisfied_classes(self):
660             with self.queue_data_lock:
661                 if not self.storage_classes_tracking:
662                     # Notifies disabled storage classes expectation to
663                     # the outer loop.
664                     return None
665             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
666
667         def pending_classes(self):
668             with self.queue_data_lock:
669                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
670                     return []
671                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
672                 for st_class, st_copies in self.confirmed_storage_classes.items():
673                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
674                         unsatisfied_classes.remove(st_class)
675                 return unsatisfied_classes
676
677         def get_next_task(self):
678             with self.pending_tries_notification:
679                 while True:
680                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
681                         # This notify_all() is unnecessary --
682                         # write_success() already called notify_all()
683                         # when pending<1 became true, so it's not
684                         # possible for any other thread to be in
685                         # wait() now -- but it's cheap insurance
686                         # against deadlock so we do it anyway:
687                         self.pending_tries_notification.notify_all()
688                         # Drain the queue and then raise Queue.Empty
689                         while True:
690                             self.get_nowait()
691                             self.task_done()
692                     elif self.pending_tries > 0:
693                         service, service_root = self.get_nowait()
694                         if service.finished():
695                             self.task_done()
696                             continue
697                         self.pending_tries -= 1
698                         return service, service_root
699                     elif self.empty():
700                         self.pending_tries_notification.notify_all()
701                         raise queue.Empty
702                     else:
703                         self.pending_tries_notification.wait()
704
705
706     class KeepWriterThreadPool(object):
707         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
708             self.total_task_nr = 0
709             if (not max_service_replicas) or (max_service_replicas >= copies):
710                 num_threads = 1
711             else:
712                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
713             _logger.debug("Pool max threads is %d", num_threads)
714             self.workers = []
715             self.queue = KeepClient.KeepWriterQueue(copies, classes)
716             # Create workers
717             for _ in range(num_threads):
718                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
719                 self.workers.append(w)
720
721         def add_task(self, ks, service_root):
722             self.queue.put((ks, service_root))
723             self.total_task_nr += 1
724
725         def done(self):
726             return self.queue.successful_copies, self.queue.satisfied_classes()
727
728         def join(self):
729             # Start workers
730             for worker in self.workers:
731                 worker.start()
732             # Wait for finished work
733             self.queue.join()
734
735         def response(self):
736             return self.queue.response
737
738
739     class KeepWriterThread(threading.Thread):
740         class TaskFailed(RuntimeError): pass
741
742         def __init__(self, queue, data, data_hash, timeout=None):
743             super(KeepClient.KeepWriterThread, self).__init__()
744             self.timeout = timeout
745             self.queue = queue
746             self.data = data
747             self.data_hash = data_hash
748             self.daemon = True
749
750         def run(self):
751             while True:
752                 try:
753                     service, service_root = self.queue.get_next_task()
754                 except queue.Empty:
755                     return
756                 try:
757                     locator, copies, classes = self.do_task(service, service_root)
758                 except Exception as e:
759                     if not isinstance(e, self.TaskFailed):
760                         _logger.exception("Exception in KeepWriterThread")
761                     self.queue.write_fail(service)
762                 else:
763                     self.queue.write_success(locator, copies, classes)
764                 finally:
765                     self.queue.task_done()
766
767         def do_task(self, service, service_root):
768             classes = self.queue.pending_classes()
769             headers = {}
770             if len(classes) > 0:
771                 classes.sort()
772                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
773             success = bool(service.put(self.data_hash,
774                                         self.data,
775                                         timeout=self.timeout,
776                                         headers=headers))
777             result = service.last_result()
778
779             if not success:
780                 if result.get('status_code'):
781                     _logger.debug("Request fail: PUT %s => %s %s",
782                                   self.data_hash,
783                                   result.get('status_code'),
784                                   result.get('body'))
785                 raise self.TaskFailed()
786
787             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
788                           str(threading.current_thread()),
789                           self.data_hash,
790                           len(self.data),
791                           service_root)
792             try:
793                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
794             except (KeyError, ValueError):
795                 replicas_stored = 1
796
797             classes_confirmed = {}
798             try:
799                 scch = result['headers']['x-keep-storage-classes-confirmed']
800                 for confirmation in scch.replace(' ', '').split(','):
801                     if '=' in confirmation:
802                         stored_class, stored_copies = confirmation.split('=')[:2]
803                         classes_confirmed[stored_class] = int(stored_copies)
804             except (KeyError, ValueError):
805                 # Storage classes confirmed header missing or corrupt
806                 classes_confirmed = None
807
808             return result['body'].strip(), replicas_stored, classes_confirmed
809
810
811     def __init__(self, api_client=None, proxy=None,
812                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
813                  api_token=None, local_store=None, block_cache=None,
814                  num_retries=10, session=None, num_prefetch_threads=None):
815         """Initialize a new KeepClient.
816
817         Arguments:
818         :api_client:
819           The API client to use to find Keep services.  If not
820           provided, KeepClient will build one from available Arvados
821           configuration.
822
823         :proxy:
824           If specified, this KeepClient will send requests to this Keep
825           proxy.  Otherwise, KeepClient will fall back to the setting of the
826           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
827           If you want to KeepClient does not use a proxy, pass in an empty
828           string.
829
830         :timeout:
831           The initial timeout (in seconds) for HTTP requests to Keep
832           non-proxy servers.  A tuple of three floats is interpreted as
833           (connection_timeout, read_timeout, minimum_bandwidth). A connection
834           will be aborted if the average traffic rate falls below
835           minimum_bandwidth bytes per second over an interval of read_timeout
836           seconds. Because timeouts are often a result of transient server
837           load, the actual connection timeout will be increased by a factor
838           of two on each retry.
839           Default: (2, 256, 32768).
840
841         :proxy_timeout:
842           The initial timeout (in seconds) for HTTP requests to
843           Keep proxies. A tuple of three floats is interpreted as
844           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
845           described above for adjusting connection timeouts on retry also
846           applies.
847           Default: (20, 256, 32768).
848
849         :api_token:
850           If you're not using an API client, but only talking
851           directly to a Keep proxy, this parameter specifies an API token
852           to authenticate Keep requests.  It is an error to specify both
853           api_client and api_token.  If you specify neither, KeepClient
854           will use one available from the Arvados configuration.
855
856         :local_store:
857           If specified, this KeepClient will bypass Keep
858           services, and save data to the named directory.  If unspecified,
859           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
860           environment variable.  If you want to ensure KeepClient does not
861           use local storage, pass in an empty string.  This is primarily
862           intended to mock a server for testing.
863
864         :num_retries:
865           The default number of times to retry failed requests.
866           This will be used as the default num_retries value when get() and
867           put() are called.  Default 10.
868         """
869         self.lock = threading.Lock()
870         if proxy is None:
871             if config.get('ARVADOS_KEEP_SERVICES'):
872                 proxy = config.get('ARVADOS_KEEP_SERVICES')
873             else:
874                 proxy = config.get('ARVADOS_KEEP_PROXY')
875         if api_token is None:
876             if api_client is None:
877                 api_token = config.get('ARVADOS_API_TOKEN')
878             else:
879                 api_token = api_client.api_token
880         elif api_client is not None:
881             raise ValueError(
882                 "can't build KeepClient with both API client and token")
883         if local_store is None:
884             local_store = os.environ.get('KEEP_LOCAL_STORE')
885
886         if api_client is None:
887             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
888         else:
889             self.insecure = api_client.insecure
890
891         self.block_cache = block_cache if block_cache else KeepBlockCache()
892         self.timeout = timeout
893         self.proxy_timeout = proxy_timeout
894         self._user_agent_pool = queue.LifoQueue()
895         self.upload_counter = Counter()
896         self.download_counter = Counter()
897         self.put_counter = Counter()
898         self.get_counter = Counter()
899         self.hits_counter = Counter()
900         self.misses_counter = Counter()
901         self._storage_classes_unsupported_warning = False
902         self._default_classes = []
903         if num_prefetch_threads is not None:
904             self.num_prefetch_threads = num_prefetch_threads
905         else:
906             self.num_prefetch_threads = 2
907         self._prefetch_queue = None
908         self._prefetch_threads = None
909
910         if local_store:
911             self.local_store = local_store
912             self.head = self.local_store_head
913             self.get = self.local_store_get
914             self.put = self.local_store_put
915         else:
916             self.num_retries = num_retries
917             self.max_replicas_per_service = None
918             if proxy:
919                 proxy_uris = proxy.split()
920                 for i in range(len(proxy_uris)):
921                     if not proxy_uris[i].endswith('/'):
922                         proxy_uris[i] += '/'
923                     # URL validation
924                     url = urllib.parse.urlparse(proxy_uris[i])
925                     if not (url.scheme and url.netloc):
926                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
927                 self.api_token = api_token
928                 self._gateway_services = {}
929                 self._keep_services = [{
930                     'uuid': "00000-bi6l4-%015d" % idx,
931                     'service_type': 'proxy',
932                     '_service_root': uri,
933                     } for idx, uri in enumerate(proxy_uris)]
934                 self._writable_services = self._keep_services
935                 self.using_proxy = True
936                 self._static_services_list = True
937             else:
938                 # It's important to avoid instantiating an API client
939                 # unless we actually need one, for testing's sake.
940                 if api_client is None:
941                     api_client = arvados.api('v1')
942                 self.api_client = api_client
943                 self.api_token = api_client.api_token
944                 self._gateway_services = {}
945                 self._keep_services = None
946                 self._writable_services = None
947                 self.using_proxy = None
948                 self._static_services_list = False
949                 try:
950                     self._default_classes = [
951                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
952                 except KeyError:
953                     # We're talking to an old cluster
954                     pass
955
956     def current_timeout(self, attempt_number):
957         """Return the appropriate timeout to use for this client.
958
959         The proxy timeout setting if the backend service is currently a proxy,
960         the regular timeout setting otherwise.  The `attempt_number` indicates
961         how many times the operation has been tried already (starting from 0
962         for the first try), and scales the connection timeout portion of the
963         return value accordingly.
964
965         """
966         # TODO(twp): the timeout should be a property of a
967         # KeepService, not a KeepClient. See #4488.
968         t = self.proxy_timeout if self.using_proxy else self.timeout
969         if len(t) == 2:
970             return (t[0] * (1 << attempt_number), t[1])
971         else:
972             return (t[0] * (1 << attempt_number), t[1], t[2])
973     def _any_nondisk_services(self, service_list):
974         return any(ks.get('service_type', 'disk') != 'disk'
975                    for ks in service_list)
976
977     def build_services_list(self, force_rebuild=False):
978         if (self._static_services_list or
979               (self._keep_services and not force_rebuild)):
980             return
981         with self.lock:
982             try:
983                 keep_services = self.api_client.keep_services().accessible()
984             except Exception:  # API server predates Keep services.
985                 keep_services = self.api_client.keep_disks().list()
986
987             # Gateway services are only used when specified by UUID,
988             # so there's nothing to gain by filtering them by
989             # service_type.
990             self._gateway_services = {ks['uuid']: ks for ks in
991                                       keep_services.execute()['items']}
992             if not self._gateway_services:
993                 raise arvados.errors.NoKeepServersError()
994
995             # Precompute the base URI for each service.
996             for r in self._gateway_services.values():
997                 host = r['service_host']
998                 if not host.startswith('[') and host.find(':') >= 0:
999                     # IPv6 URIs must be formatted like http://[::1]:80/...
1000                     host = '[' + host + ']'
1001                 r['_service_root'] = "{}://{}:{:d}/".format(
1002                     'https' if r['service_ssl_flag'] else 'http',
1003                     host,
1004                     r['service_port'])
1005
1006             _logger.debug(str(self._gateway_services))
1007             self._keep_services = [
1008                 ks for ks in self._gateway_services.values()
1009                 if not ks.get('service_type', '').startswith('gateway:')]
1010             self._writable_services = [ks for ks in self._keep_services
1011                                        if not ks.get('read_only')]
1012
1013             # For disk type services, max_replicas_per_service is 1
1014             # It is unknown (unlimited) for other service types.
1015             if self._any_nondisk_services(self._writable_services):
1016                 self.max_replicas_per_service = None
1017             else:
1018                 self.max_replicas_per_service = 1
1019
1020     def _service_weight(self, data_hash, service_uuid):
1021         """Compute the weight of a Keep service endpoint for a data
1022         block with a known hash.
1023
1024         The weight is md5(h + u) where u is the last 15 characters of
1025         the service endpoint's UUID.
1026         """
1027         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1028
1029     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1030         """Return an array of Keep service endpoints, in the order in
1031         which they should be probed when reading or writing data with
1032         the given hash+hints.
1033         """
1034         self.build_services_list(force_rebuild)
1035
1036         sorted_roots = []
1037         # Use the services indicated by the given +K@... remote
1038         # service hints, if any are present and can be resolved to a
1039         # URI.
1040         for hint in locator.hints:
1041             if hint.startswith('K@'):
1042                 if len(hint) == 7:
1043                     sorted_roots.append(
1044                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1045                 elif len(hint) == 29:
1046                     svc = self._gateway_services.get(hint[2:])
1047                     if svc:
1048                         sorted_roots.append(svc['_service_root'])
1049
1050         # Sort the available local services by weight (heaviest first)
1051         # for this locator, and return their service_roots (base URIs)
1052         # in that order.
1053         use_services = self._keep_services
1054         if need_writable:
1055             use_services = self._writable_services
1056         self.using_proxy = self._any_nondisk_services(use_services)
1057         sorted_roots.extend([
1058             svc['_service_root'] for svc in sorted(
1059                 use_services,
1060                 reverse=True,
1061                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1062         _logger.debug("{}: {}".format(locator, sorted_roots))
1063         return sorted_roots
1064
1065     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1066         # roots_map is a dictionary, mapping Keep service root strings
1067         # to KeepService objects.  Poll for Keep services, and add any
1068         # new ones to roots_map.  Return the current list of local
1069         # root strings.
1070         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1071         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1072         for root in local_roots:
1073             if root not in roots_map:
1074                 roots_map[root] = self.KeepService(
1075                     root, self._user_agent_pool,
1076                     upload_counter=self.upload_counter,
1077                     download_counter=self.download_counter,
1078                     headers=headers,
1079                     insecure=self.insecure)
1080         return local_roots
1081
1082     @staticmethod
1083     def _check_loop_result(result):
1084         # KeepClient RetryLoops should save results as a 2-tuple: the
1085         # actual result of the request, and the number of servers available
1086         # to receive the request this round.
1087         # This method returns True if there's a real result, False if
1088         # there are no more servers available, otherwise None.
1089         if isinstance(result, Exception):
1090             return None
1091         result, tried_server_count = result
1092         if (result is not None) and (result is not False):
1093             return True
1094         elif tried_server_count < 1:
1095             _logger.info("No more Keep services to try; giving up")
1096             return False
1097         else:
1098             return None
1099
1100     def get_from_cache(self, loc_s):
1101         """Fetch a block only if is in the cache, otherwise return None."""
1102         locator = KeepLocator(loc_s)
1103         slot = self.block_cache.get(locator.md5sum)
1104         if slot is not None and slot.ready.is_set():
1105             return slot.get()
1106         else:
1107             return None
1108
1109     def refresh_signature(self, loc):
1110         """Ask Keep to get the remote block and return its local signature"""
1111         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1112         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1113
1114     @retry.retry_method
1115     def head(self, loc_s, **kwargs):
1116         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1117
1118     @retry.retry_method
1119     def get(self, loc_s, **kwargs):
1120         return self._get_or_head(loc_s, method="GET", **kwargs)
1121
1122     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1123         """Get data from Keep.
1124
1125         This method fetches one or more blocks of data from Keep.  It
1126         sends a request each Keep service registered with the API
1127         server (or the proxy provided when this client was
1128         instantiated), then each service named in location hints, in
1129         sequence.  As soon as one service provides the data, it's
1130         returned.
1131
1132         Arguments:
1133         * loc_s: A string of one or more comma-separated locators to fetch.
1134           This method returns the concatenation of these blocks.
1135         * num_retries: The number of times to retry GET requests to
1136           *each* Keep server if it returns temporary failures, with
1137           exponential backoff.  Note that, in each loop, the method may try
1138           to fetch data from every available Keep service, along with any
1139           that are named in location hints in the locator.  The default value
1140           is set when the KeepClient is initialized.
1141         """
1142         if ',' in loc_s:
1143             return ''.join(self.get(x) for x in loc_s.split(','))
1144
1145         self.get_counter.add(1)
1146
1147         request_id = (request_id or
1148                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1149                       arvados.util.new_request_id())
1150         if headers is None:
1151             headers = {}
1152         headers['X-Request-Id'] = request_id
1153
1154         slot = None
1155         blob = None
1156         try:
1157             locator = KeepLocator(loc_s)
1158             if method == "GET":
1159                 while slot is None:
1160                     slot, first = self.block_cache.reserve_cache(locator.md5sum)
1161                     if first:
1162                         # Fresh and empty "first time it is used" slot
1163                         break
1164                     if prefetch:
1165                         # this is request for a prefetch to fill in
1166                         # the cache, don't need to wait for the
1167                         # result, so if it is already in flight return
1168                         # immediately.  Clear 'slot' to prevent
1169                         # finally block from calling slot.set()
1170                         if slot.ready.is_set():
1171                             slot.get()
1172                         slot = None
1173                         return None
1174
1175                     blob = slot.get()
1176                     if blob is not None:
1177                         self.hits_counter.add(1)
1178                         return blob
1179
1180                     # If blob is None, this means either
1181                     #
1182                     # (a) another thread was fetching this block and
1183                     # failed with an error or
1184                     #
1185                     # (b) cache thrashing caused the slot to be
1186                     # evicted (content set to None) by another thread
1187                     # between the call to reserve_cache() and get().
1188                     #
1189                     # We'll handle these cases by reserving a new slot
1190                     # and then doing a full GET request.
1191                     slot = None
1192
1193             self.misses_counter.add(1)
1194
1195             # If the locator has hints specifying a prefix (indicating a
1196             # remote keepproxy) or the UUID of a local gateway service,
1197             # read data from the indicated service(s) instead of the usual
1198             # list of local disk services.
1199             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1200                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1201             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1202                                for hint in locator.hints if (
1203                                        hint.startswith('K@') and
1204                                        len(hint) == 29 and
1205                                        self._gateway_services.get(hint[2:])
1206                                        )])
1207             # Map root URLs to their KeepService objects.
1208             roots_map = {
1209                 root: self.KeepService(root, self._user_agent_pool,
1210                                        upload_counter=self.upload_counter,
1211                                        download_counter=self.download_counter,
1212                                        headers=headers,
1213                                        insecure=self.insecure)
1214                 for root in hint_roots
1215             }
1216
1217             # See #3147 for a discussion of the loop implementation.  Highlights:
1218             # * Refresh the list of Keep services after each failure, in case
1219             #   it's being updated.
1220             # * Retry until we succeed, we're out of retries, or every available
1221             #   service has returned permanent failure.
1222             sorted_roots = []
1223             roots_map = {}
1224             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1225                                    backoff_start=2)
1226             for tries_left in loop:
1227                 try:
1228                     sorted_roots = self.map_new_services(
1229                         roots_map, locator,
1230                         force_rebuild=(tries_left < num_retries),
1231                         need_writable=False,
1232                         headers=headers)
1233                 except Exception as error:
1234                     loop.save_result(error)
1235                     continue
1236
1237                 # Query KeepService objects that haven't returned
1238                 # permanent failure, in our specified shuffle order.
1239                 services_to_try = [roots_map[root]
1240                                    for root in sorted_roots
1241                                    if roots_map[root].usable()]
1242                 for keep_service in services_to_try:
1243                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1244                     if blob is not None:
1245                         break
1246                 loop.save_result((blob, len(services_to_try)))
1247
1248             # Always cache the result, then return it if we succeeded.
1249             if loop.success():
1250                 return blob
1251         finally:
1252             if slot is not None:
1253                 self.block_cache.set(slot, blob)
1254
1255         # Q: Including 403 is necessary for the Keep tests to continue
1256         # passing, but maybe they should expect KeepReadError instead?
1257         not_founds = sum(1 for key in sorted_roots
1258                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1259         service_errors = ((key, roots_map[key].last_result()['error'])
1260                           for key in sorted_roots)
1261         if not roots_map:
1262             raise arvados.errors.KeepReadError(
1263                 "[{}] failed to read {}: no Keep services available ({})".format(
1264                     request_id, loc_s, loop.last_result()))
1265         elif not_founds == len(sorted_roots):
1266             raise arvados.errors.NotFoundError(
1267                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1268         else:
1269             raise arvados.errors.KeepReadError(
1270                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1271
1272     @retry.retry_method
1273     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1274         """Save data in Keep.
1275
1276         This method will get a list of Keep services from the API server, and
1277         send the data to each one simultaneously in a new thread.  Once the
1278         uploads are finished, if enough copies are saved, this method returns
1279         the most recent HTTP response body.  If requests fail to upload
1280         enough copies, this method raises KeepWriteError.
1281
1282         Arguments:
1283         * data: The string of data to upload.
1284         * copies: The number of copies that the user requires be saved.
1285           Default 2.
1286         * num_retries: The number of times to retry PUT requests to
1287           *each* Keep server if it returns temporary failures, with
1288           exponential backoff.  The default value is set when the
1289           KeepClient is initialized.
1290         * classes: An optional list of storage class names where copies should
1291           be written.
1292         """
1293
1294         classes = classes or self._default_classes
1295
1296         if not isinstance(data, bytes):
1297             data = data.encode()
1298
1299         self.put_counter.add(1)
1300
1301         data_hash = hashlib.md5(data).hexdigest()
1302         loc_s = data_hash + '+' + str(len(data))
1303         if copies < 1:
1304             return loc_s
1305         locator = KeepLocator(loc_s)
1306
1307         request_id = (request_id or
1308                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1309                       arvados.util.new_request_id())
1310         headers = {
1311             'X-Request-Id': request_id,
1312             'X-Keep-Desired-Replicas': str(copies),
1313         }
1314         roots_map = {}
1315         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1316                                backoff_start=2)
1317         done_copies = 0
1318         done_classes = []
1319         for tries_left in loop:
1320             try:
1321                 sorted_roots = self.map_new_services(
1322                     roots_map, locator,
1323                     force_rebuild=(tries_left < num_retries),
1324                     need_writable=True,
1325                     headers=headers)
1326             except Exception as error:
1327                 loop.save_result(error)
1328                 continue
1329
1330             pending_classes = []
1331             if done_classes is not None:
1332                 pending_classes = list(set(classes) - set(done_classes))
1333             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1334                                                         data_hash=data_hash,
1335                                                         copies=copies - done_copies,
1336                                                         max_service_replicas=self.max_replicas_per_service,
1337                                                         timeout=self.current_timeout(num_retries - tries_left),
1338                                                         classes=pending_classes)
1339             for service_root, ks in [(root, roots_map[root])
1340                                      for root in sorted_roots]:
1341                 if ks.finished():
1342                     continue
1343                 writer_pool.add_task(ks, service_root)
1344             writer_pool.join()
1345             pool_copies, pool_classes = writer_pool.done()
1346             done_copies += pool_copies
1347             if (done_classes is not None) and (pool_classes is not None):
1348                 done_classes += pool_classes
1349                 loop.save_result(
1350                     (done_copies >= copies and set(done_classes) == set(classes),
1351                     writer_pool.total_task_nr))
1352             else:
1353                 # Old keepstore contacted without storage classes support:
1354                 # success is determined only by successful copies.
1355                 #
1356                 # Disable storage classes tracking from this point forward.
1357                 if not self._storage_classes_unsupported_warning:
1358                     self._storage_classes_unsupported_warning = True
1359                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1360                 done_classes = None
1361                 loop.save_result(
1362                     (done_copies >= copies, writer_pool.total_task_nr))
1363
1364         if loop.success():
1365             return writer_pool.response()
1366         if not roots_map:
1367             raise arvados.errors.KeepWriteError(
1368                 "[{}] failed to write {}: no Keep services available ({})".format(
1369                     request_id, data_hash, loop.last_result()))
1370         else:
1371             service_errors = ((key, roots_map[key].last_result()['error'])
1372                               for key in sorted_roots
1373                               if roots_map[key].last_result()['error'])
1374             raise arvados.errors.KeepWriteError(
1375                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1376                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1377
1378     def _block_prefetch_worker(self):
1379         """The background downloader thread."""
1380         while True:
1381             try:
1382                 b = self._prefetch_queue.get()
1383                 if b is None:
1384                     return
1385                 self.get(b, prefetch=True)
1386             except Exception:
1387                 _logger.exception("Exception doing block prefetch")
1388
1389     def _start_prefetch_threads(self):
1390         if self._prefetch_threads is None:
1391             with self.lock:
1392                 if self._prefetch_threads is not None:
1393                     return
1394                 self._prefetch_queue = queue.Queue()
1395                 self._prefetch_threads = []
1396                 for i in range(0, self.num_prefetch_threads):
1397                     thread = threading.Thread(target=self._block_prefetch_worker)
1398                     self._prefetch_threads.append(thread)
1399                     thread.daemon = True
1400                     thread.start()
1401
1402     def block_prefetch(self, locator):
1403         """
1404         This relies on the fact that KeepClient implements a block cache,
1405         so repeated requests for the same block will not result in repeated
1406         downloads (unless the block is evicted from the cache.)  This method
1407         does not block.
1408         """
1409
1410         if self.block_cache.get(locator) is not None:
1411             return
1412
1413         self._start_prefetch_threads()
1414         self._prefetch_queue.put(locator)
1415
1416     def stop_prefetch_threads(self):
1417         with self.lock:
1418             if self._prefetch_threads is not None:
1419                 for t in self._prefetch_threads:
1420                     self._prefetch_queue.put(None)
1421                 for t in self._prefetch_threads:
1422                     t.join()
1423             self._prefetch_threads = None
1424             self._prefetch_queue = None
1425
1426     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1427         """A stub for put().
1428
1429         This method is used in place of the real put() method when
1430         using local storage (see constructor's local_store argument).
1431
1432         copies and num_retries arguments are ignored: they are here
1433         only for the sake of offering the same call signature as
1434         put().
1435
1436         Data stored this way can be retrieved via local_store_get().
1437         """
1438         md5 = hashlib.md5(data).hexdigest()
1439         locator = '%s+%d' % (md5, len(data))
1440         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1441             f.write(data)
1442         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1443                   os.path.join(self.local_store, md5))
1444         return locator
1445
1446     def local_store_get(self, loc_s, num_retries=None):
1447         """Companion to local_store_put()."""
1448         try:
1449             locator = KeepLocator(loc_s)
1450         except ValueError:
1451             raise arvados.errors.NotFoundError(
1452                 "Invalid data locator: '%s'" % loc_s)
1453         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1454             return b''
1455         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1456             return f.read()
1457
1458     def local_store_head(self, loc_s, num_retries=None):
1459         """Companion to local_store_put()."""
1460         try:
1461             locator = KeepLocator(loc_s)
1462         except ValueError:
1463             raise arvados.errors.NotFoundError(
1464                 "Invalid data locator: '%s'" % loc_s)
1465         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1466             return True
1467         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1468             return True