21935: Move arvados.diskcache under arvados._internal
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import copy
6 import collections
7 import datetime
8 import hashlib
9 import errno
10 import io
11 import logging
12 import math
13 import os
14 import pycurl
15 import queue
16 import re
17 import socket
18 import ssl
19 import sys
20 import threading
21 import resource
22 import urllib.parse
23 import traceback
24 import weakref
25
26 from io import BytesIO
27
28 import arvados
29 import arvados.config as config
30 import arvados.errors
31 import arvados.retry as retry
32 import arvados.util
33
34 from arvados._internal import diskcache, Timer
35 from arvados._pycurlhelper import PyCurlHelper
36
37 _logger = logging.getLogger('arvados.keep')
38 global_client_object = None
39
40 # Monkey patch TCP constants when not available (apple). Values sourced from:
41 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
42 if sys.platform == 'darwin':
43     if not hasattr(socket, 'TCP_KEEPALIVE'):
44         socket.TCP_KEEPALIVE = 0x010
45     if not hasattr(socket, 'TCP_KEEPINTVL'):
46         socket.TCP_KEEPINTVL = 0x101
47     if not hasattr(socket, 'TCP_KEEPCNT'):
48         socket.TCP_KEEPCNT = 0x102
49
50 class KeepLocator(object):
51     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
52     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
53
54     def __init__(self, locator_str):
55         self.hints = []
56         self._perm_sig = None
57         self._perm_expiry = None
58         pieces = iter(locator_str.split('+'))
59         self.md5sum = next(pieces)
60         try:
61             self.size = int(next(pieces))
62         except StopIteration:
63             self.size = None
64         for hint in pieces:
65             if self.HINT_RE.match(hint) is None:
66                 raise ValueError("invalid hint format: {}".format(hint))
67             elif hint.startswith('A'):
68                 self.parse_permission_hint(hint)
69             else:
70                 self.hints.append(hint)
71
72     def __str__(self):
73         return '+'.join(
74             str(s)
75             for s in [self.md5sum, self.size,
76                       self.permission_hint()] + self.hints
77             if s is not None)
78
79     def stripped(self):
80         if self.size is not None:
81             return "%s+%i" % (self.md5sum, self.size)
82         else:
83             return self.md5sum
84
85     def _make_hex_prop(name, length):
86         # Build and return a new property with the given name that
87         # must be a hex string of the given length.
88         data_name = '_{}'.format(name)
89         def getter(self):
90             return getattr(self, data_name)
91         def setter(self, hex_str):
92             if not arvados.util.is_hex(hex_str, length):
93                 raise ValueError("{} is not a {}-digit hex string: {!r}".
94                                  format(name, length, hex_str))
95             setattr(self, data_name, hex_str)
96         return property(getter, setter)
97
98     md5sum = _make_hex_prop('md5sum', 32)
99     perm_sig = _make_hex_prop('perm_sig', 40)
100
101     @property
102     def perm_expiry(self):
103         return self._perm_expiry
104
105     @perm_expiry.setter
106     def perm_expiry(self, value):
107         if not arvados.util.is_hex(value, 1, 8):
108             raise ValueError(
109                 "permission timestamp must be a hex Unix timestamp: {}".
110                 format(value))
111         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
112
113     def permission_hint(self):
114         data = [self.perm_sig, self.perm_expiry]
115         if None in data:
116             return None
117         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
118         return "A{}@{:08x}".format(*data)
119
120     def parse_permission_hint(self, s):
121         try:
122             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
123         except IndexError:
124             raise ValueError("bad permission hint {}".format(s))
125
126     def permission_expired(self, as_of_dt=None):
127         if self.perm_expiry is None:
128             return False
129         elif as_of_dt is None:
130             as_of_dt = datetime.datetime.now()
131         return self.perm_expiry <= as_of_dt
132
133
134 class KeepBlockCache(object):
135     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
136         self.cache_max = cache_max
137         self._cache = collections.OrderedDict()
138         self._cache_lock = threading.Lock()
139         self._max_slots = max_slots
140         self._disk_cache = disk_cache
141         self._disk_cache_dir = disk_cache_dir
142         self._cache_updating = threading.Condition(self._cache_lock)
143
144         if self._disk_cache and self._disk_cache_dir is None:
145             self._disk_cache_dir = str(arvados.util._BaseDirectories('CACHE').storage_path('keep'))
146
147         if self._max_slots == 0:
148             if self._disk_cache:
149                 # Each block uses two file descriptors, one used to
150                 # open it initially and hold the flock(), and a second
151                 # hidden one used by mmap().
152                 #
153                 # Set max slots to 1/8 of maximum file handles.  This
154                 # means we'll use at most 1/4 of total file handles.
155                 #
156                 # NOFILE typically defaults to 1024 on Linux so this
157                 # is 128 slots (256 file handles), which means we can
158                 # cache up to 8 GiB of 64 MiB blocks.  This leaves
159                 # 768 file handles for sockets and other stuff.
160                 #
161                 # When we want the ability to have more cache (e.g. in
162                 # arv-mount) we'll increase rlimit before calling
163                 # this.
164                 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
165             else:
166                 # RAM cache slots
167                 self._max_slots = 512
168
169         if self.cache_max == 0:
170             if self._disk_cache:
171                 fs = os.statvfs(self._disk_cache_dir)
172                 # Calculation of available space incorporates existing cache usage
173                 existing_usage = diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
174                 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
175                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
176                 # pick smallest of:
177                 # 10% of total disk size
178                 # 25% of available space
179                 # max_slots * 64 MiB
180                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
181             else:
182                 # 256 MiB in RAM
183                 self.cache_max = (256 * 1024 * 1024)
184
185         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
186
187         self.cache_total = 0
188         if self._disk_cache:
189             self._cache = diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
190             for slot in self._cache.values():
191                 self.cache_total += slot.size()
192             self.cap_cache()
193
194     class CacheSlot(object):
195         __slots__ = ("locator", "ready", "content")
196
197         def __init__(self, locator):
198             self.locator = locator
199             self.ready = threading.Event()
200             self.content = None
201
202         def get(self):
203             self.ready.wait()
204             return self.content
205
206         def set(self, value):
207             if self.content is not None:
208                 return False
209             self.content = value
210             self.ready.set()
211             return True
212
213         def size(self):
214             if self.content is None:
215                 return 0
216             else:
217                 return len(self.content)
218
219         def evict(self):
220             self.content = None
221
222
223     def _resize_cache(self, cache_max, max_slots):
224         # Try and make sure the contents of the cache do not exceed
225         # the supplied maximums.
226
227         if self.cache_total <= cache_max and len(self._cache) <= max_slots:
228             return
229
230         _evict_candidates = collections.deque(self._cache.values())
231         while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
232             slot = _evict_candidates.popleft()
233             if not slot.ready.is_set():
234                 continue
235
236             sz = slot.size()
237             slot.evict()
238             self.cache_total -= sz
239             del self._cache[slot.locator]
240
241
242     def cap_cache(self):
243         '''Cap the cache size to self.cache_max'''
244         with self._cache_updating:
245             self._resize_cache(self.cache_max, self._max_slots)
246             self._cache_updating.notify_all()
247
248     def _get(self, locator):
249         # Test if the locator is already in the cache
250         if locator in self._cache:
251             n = self._cache[locator]
252             if n.ready.is_set() and n.content is None:
253                 del self._cache[n.locator]
254                 return None
255             self._cache.move_to_end(locator)
256             return n
257         if self._disk_cache:
258             # see if it exists on disk
259             n = diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
260             if n is not None:
261                 self._cache[n.locator] = n
262                 self.cache_total += n.size()
263                 return n
264         return None
265
266     def get(self, locator):
267         with self._cache_lock:
268             return self._get(locator)
269
270     def reserve_cache(self, locator):
271         '''Reserve a cache slot for the specified locator,
272         or return the existing slot.'''
273         with self._cache_updating:
274             n = self._get(locator)
275             if n:
276                 return n, False
277             else:
278                 # Add a new cache slot for the locator
279                 self._resize_cache(self.cache_max, self._max_slots-1)
280                 while len(self._cache) >= self._max_slots:
281                     # If there isn't a slot available, need to wait
282                     # for something to happen that releases one of the
283                     # cache slots.  Idle for 200 ms or woken up by
284                     # another thread
285                     self._cache_updating.wait(timeout=0.2)
286                     self._resize_cache(self.cache_max, self._max_slots-1)
287
288                 if self._disk_cache:
289                     n = diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
290                 else:
291                     n = KeepBlockCache.CacheSlot(locator)
292                 self._cache[n.locator] = n
293                 return n, True
294
295     def set(self, slot, blob):
296         try:
297             if slot.set(blob):
298                 self.cache_total += slot.size()
299             return
300         except OSError as e:
301             if e.errno == errno.ENOMEM:
302                 # Reduce max slots to current - 4, cap cache and retry
303                 with self._cache_lock:
304                     self._max_slots = max(4, len(self._cache) - 4)
305             elif e.errno == errno.ENOSPC:
306                 # Reduce disk max space to current - 256 MiB, cap cache and retry
307                 with self._cache_lock:
308                     sm = sum(st.size() for st in self._cache.values())
309                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
310             elif e.errno == errno.ENODEV:
311                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
312         except Exception as e:
313             pass
314         finally:
315             # Check if we should evict things from the cache.  Either
316             # because we added a new thing or there was an error and
317             # we possibly adjusted the limits down, so we might need
318             # to push something out.
319             self.cap_cache()
320
321         try:
322             # Only gets here if there was an error the first time. The
323             # exception handler adjusts limits downward in some cases
324             # to free up resources, which would make the operation
325             # succeed.
326             if slot.set(blob):
327                 self.cache_total += slot.size()
328         except Exception as e:
329             # It failed again.  Give up.
330             slot.set(None)
331             raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
332
333         self.cap_cache()
334
335 class Counter(object):
336     def __init__(self, v=0):
337         self._lk = threading.Lock()
338         self._val = v
339
340     def add(self, v):
341         with self._lk:
342             self._val += v
343
344     def get(self):
345         with self._lk:
346             return self._val
347
348
349 class KeepClient(object):
350     DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
351     DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
352
353     class KeepService(PyCurlHelper):
354         """Make requests to a single Keep service, and track results.
355
356         A KeepService is intended to last long enough to perform one
357         transaction (GET or PUT) against one Keep service. This can
358         involve calling either get() or put() multiple times in order
359         to retry after transient failures. However, calling both get()
360         and put() on a single instance -- or using the same instance
361         to access two different Keep services -- will not produce
362         sensible behavior.
363         """
364
365         HTTP_ERRORS = (
366             socket.error,
367             ssl.SSLError,
368             arvados.errors.HttpError,
369         )
370
371         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
372                      upload_counter=None,
373                      download_counter=None,
374                      headers={},
375                      insecure=False):
376             super(KeepClient.KeepService, self).__init__()
377             self.root = root
378             self._user_agent_pool = user_agent_pool
379             self._result = {'error': None}
380             self._usable = True
381             self._session = None
382             self._socket = None
383             self.get_headers = {'Accept': 'application/octet-stream'}
384             self.get_headers.update(headers)
385             self.put_headers = headers
386             self.upload_counter = upload_counter
387             self.download_counter = download_counter
388             self.insecure = insecure
389
390         def usable(self):
391             """Is it worth attempting a request?"""
392             return self._usable
393
394         def finished(self):
395             """Did the request succeed or encounter permanent failure?"""
396             return self._result['error'] == False or not self._usable
397
398         def last_result(self):
399             return self._result
400
401         def _get_user_agent(self):
402             try:
403                 return self._user_agent_pool.get(block=False)
404             except queue.Empty:
405                 return pycurl.Curl()
406
407         def _put_user_agent(self, ua):
408             try:
409                 ua.reset()
410                 self._user_agent_pool.put(ua, block=False)
411             except:
412                 ua.close()
413
414         def get(self, locator, method="GET", timeout=None):
415             # locator is a KeepLocator object.
416             url = self.root + str(locator)
417             _logger.debug("Request: %s %s", method, url)
418             curl = self._get_user_agent()
419             ok = None
420             try:
421                 with Timer() as t:
422                     self._headers = {}
423                     response_body = BytesIO()
424                     curl.setopt(pycurl.NOSIGNAL, 1)
425                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
426                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
427                     curl.setopt(pycurl.URL, url.encode('utf-8'))
428                     curl.setopt(pycurl.HTTPHEADER, [
429                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
430                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
431                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
432                     if self.insecure:
433                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
434                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
435                     else:
436                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
437                     if method == "HEAD":
438                         curl.setopt(pycurl.NOBODY, True)
439                     else:
440                         curl.setopt(pycurl.HTTPGET, True)
441                     self._setcurltimeouts(curl, timeout, method=="HEAD")
442
443                     try:
444                         curl.perform()
445                     except Exception as e:
446                         raise arvados.errors.HttpError(0, str(e))
447                     finally:
448                         if self._socket:
449                             self._socket.close()
450                             self._socket = None
451                     self._result = {
452                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
453                         'body': response_body.getvalue(),
454                         'headers': self._headers,
455                         'error': False,
456                     }
457
458                 ok = retry.check_http_response_success(self._result['status_code'])
459                 if not ok:
460                     self._result['error'] = arvados.errors.HttpError(
461                         self._result['status_code'],
462                         self._headers.get('x-status-line', 'Error'))
463             except self.HTTP_ERRORS as e:
464                 self._result = {
465                     'error': e,
466                 }
467             self._usable = ok != False
468             if self._result.get('status_code', None):
469                 # The client worked well enough to get an HTTP status
470                 # code, so presumably any problems are just on the
471                 # server side and it's OK to reuse the client.
472                 self._put_user_agent(curl)
473             else:
474                 # Don't return this client to the pool, in case it's
475                 # broken.
476                 curl.close()
477             if not ok:
478                 _logger.debug("Request fail: GET %s => %s: %s",
479                               url, type(self._result['error']), str(self._result['error']))
480                 return None
481             if method == "HEAD":
482                 _logger.info("HEAD %s: %s bytes",
483                          self._result['status_code'],
484                          self._result.get('content-length'))
485                 if self._result['headers'].get('x-keep-locator'):
486                     # This is a response to a remote block copy request, return
487                     # the local copy block locator.
488                     return self._result['headers'].get('x-keep-locator')
489                 return True
490
491             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
492                          self._result['status_code'],
493                          len(self._result['body']),
494                          t.msecs,
495                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
496
497             if self.download_counter:
498                 self.download_counter.add(len(self._result['body']))
499             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
500             if resp_md5 != locator.md5sum:
501                 _logger.warning("Checksum fail: md5(%s) = %s",
502                                 url, resp_md5)
503                 self._result['error'] = arvados.errors.HttpError(
504                     0, 'Checksum fail')
505                 return None
506             return self._result['body']
507
508         def put(self, hash_s, body, timeout=None, headers={}):
509             put_headers = copy.copy(self.put_headers)
510             put_headers.update(headers)
511             url = self.root + hash_s
512             _logger.debug("Request: PUT %s", url)
513             curl = self._get_user_agent()
514             ok = None
515             try:
516                 with Timer() as t:
517                     self._headers = {}
518                     body_reader = BytesIO(body)
519                     response_body = BytesIO()
520                     curl.setopt(pycurl.NOSIGNAL, 1)
521                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
522                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
523                     curl.setopt(pycurl.URL, url.encode('utf-8'))
524                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
525                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
526                     # response) instead of sending the request body immediately.
527                     # This allows the server to reject the request if the request
528                     # is invalid or the server is read-only, without waiting for
529                     # the client to send the entire block.
530                     curl.setopt(pycurl.UPLOAD, True)
531                     curl.setopt(pycurl.INFILESIZE, len(body))
532                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
533                     curl.setopt(pycurl.HTTPHEADER, [
534                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
535                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
536                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
537                     if self.insecure:
538                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
539                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
540                     else:
541                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
542                     self._setcurltimeouts(curl, timeout)
543                     try:
544                         curl.perform()
545                     except Exception as e:
546                         raise arvados.errors.HttpError(0, str(e))
547                     finally:
548                         if self._socket:
549                             self._socket.close()
550                             self._socket = None
551                     self._result = {
552                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
553                         'body': response_body.getvalue().decode('utf-8'),
554                         'headers': self._headers,
555                         'error': False,
556                     }
557                 ok = retry.check_http_response_success(self._result['status_code'])
558                 if not ok:
559                     self._result['error'] = arvados.errors.HttpError(
560                         self._result['status_code'],
561                         self._headers.get('x-status-line', 'Error'))
562             except self.HTTP_ERRORS as e:
563                 self._result = {
564                     'error': e,
565                 }
566             self._usable = ok != False # still usable if ok is True or None
567             if self._result.get('status_code', None):
568                 # Client is functional. See comment in get().
569                 self._put_user_agent(curl)
570             else:
571                 curl.close()
572             if not ok:
573                 _logger.debug("Request fail: PUT %s => %s: %s",
574                               url, type(self._result['error']), str(self._result['error']))
575                 return False
576             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
577                          self._result['status_code'],
578                          len(body),
579                          t.msecs,
580                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
581             if self.upload_counter:
582                 self.upload_counter.add(len(body))
583             return True
584
585
586     class KeepWriterQueue(queue.Queue):
587         def __init__(self, copies, classes=[]):
588             queue.Queue.__init__(self) # Old-style superclass
589             self.wanted_copies = copies
590             self.wanted_storage_classes = classes
591             self.successful_copies = 0
592             self.confirmed_storage_classes = {}
593             self.response = None
594             self.storage_classes_tracking = True
595             self.queue_data_lock = threading.RLock()
596             self.pending_tries = max(copies, len(classes))
597             self.pending_tries_notification = threading.Condition()
598
599         def write_success(self, response, replicas_nr, classes_confirmed):
600             with self.queue_data_lock:
601                 self.successful_copies += replicas_nr
602                 if classes_confirmed is None:
603                     self.storage_classes_tracking = False
604                 elif self.storage_classes_tracking:
605                     for st_class, st_copies in classes_confirmed.items():
606                         try:
607                             self.confirmed_storage_classes[st_class] += st_copies
608                         except KeyError:
609                             self.confirmed_storage_classes[st_class] = st_copies
610                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
611                 self.response = response
612             with self.pending_tries_notification:
613                 self.pending_tries_notification.notify_all()
614
615         def write_fail(self, ks):
616             with self.pending_tries_notification:
617                 self.pending_tries += 1
618                 self.pending_tries_notification.notify()
619
620         def pending_copies(self):
621             with self.queue_data_lock:
622                 return self.wanted_copies - self.successful_copies
623
624         def satisfied_classes(self):
625             with self.queue_data_lock:
626                 if not self.storage_classes_tracking:
627                     # Notifies disabled storage classes expectation to
628                     # the outer loop.
629                     return None
630             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
631
632         def pending_classes(self):
633             with self.queue_data_lock:
634                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
635                     return []
636                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
637                 for st_class, st_copies in self.confirmed_storage_classes.items():
638                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
639                         unsatisfied_classes.remove(st_class)
640                 return unsatisfied_classes
641
642         def get_next_task(self):
643             with self.pending_tries_notification:
644                 while True:
645                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
646                         # This notify_all() is unnecessary --
647                         # write_success() already called notify_all()
648                         # when pending<1 became true, so it's not
649                         # possible for any other thread to be in
650                         # wait() now -- but it's cheap insurance
651                         # against deadlock so we do it anyway:
652                         self.pending_tries_notification.notify_all()
653                         # Drain the queue and then raise Queue.Empty
654                         while True:
655                             self.get_nowait()
656                             self.task_done()
657                     elif self.pending_tries > 0:
658                         service, service_root = self.get_nowait()
659                         if service.finished():
660                             self.task_done()
661                             continue
662                         self.pending_tries -= 1
663                         return service, service_root
664                     elif self.empty():
665                         self.pending_tries_notification.notify_all()
666                         raise queue.Empty
667                     else:
668                         self.pending_tries_notification.wait()
669
670
671     class KeepWriterThreadPool(object):
672         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
673             self.total_task_nr = 0
674             if (not max_service_replicas) or (max_service_replicas >= copies):
675                 num_threads = 1
676             else:
677                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
678             _logger.debug("Pool max threads is %d", num_threads)
679             self.workers = []
680             self.queue = KeepClient.KeepWriterQueue(copies, classes)
681             # Create workers
682             for _ in range(num_threads):
683                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
684                 self.workers.append(w)
685
686         def add_task(self, ks, service_root):
687             self.queue.put((ks, service_root))
688             self.total_task_nr += 1
689
690         def done(self):
691             return self.queue.successful_copies, self.queue.satisfied_classes()
692
693         def join(self):
694             # Start workers
695             for worker in self.workers:
696                 worker.start()
697             # Wait for finished work
698             self.queue.join()
699
700         def response(self):
701             return self.queue.response
702
703
704     class KeepWriterThread(threading.Thread):
705         class TaskFailed(RuntimeError): pass
706
707         def __init__(self, queue, data, data_hash, timeout=None):
708             super(KeepClient.KeepWriterThread, self).__init__()
709             self.timeout = timeout
710             self.queue = queue
711             self.data = data
712             self.data_hash = data_hash
713             self.daemon = True
714
715         def run(self):
716             while True:
717                 try:
718                     service, service_root = self.queue.get_next_task()
719                 except queue.Empty:
720                     return
721                 try:
722                     locator, copies, classes = self.do_task(service, service_root)
723                 except Exception as e:
724                     if not isinstance(e, self.TaskFailed):
725                         _logger.exception("Exception in KeepWriterThread")
726                     self.queue.write_fail(service)
727                 else:
728                     self.queue.write_success(locator, copies, classes)
729                 finally:
730                     self.queue.task_done()
731
732         def do_task(self, service, service_root):
733             classes = self.queue.pending_classes()
734             headers = {}
735             if len(classes) > 0:
736                 classes.sort()
737                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
738             success = bool(service.put(self.data_hash,
739                                         self.data,
740                                         timeout=self.timeout,
741                                         headers=headers))
742             result = service.last_result()
743
744             if not success:
745                 if result.get('status_code'):
746                     _logger.debug("Request fail: PUT %s => %s %s",
747                                   self.data_hash,
748                                   result.get('status_code'),
749                                   result.get('body'))
750                 raise self.TaskFailed()
751
752             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
753                           str(threading.current_thread()),
754                           self.data_hash,
755                           len(self.data),
756                           service_root)
757             try:
758                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
759             except (KeyError, ValueError):
760                 replicas_stored = 1
761
762             classes_confirmed = {}
763             try:
764                 scch = result['headers']['x-keep-storage-classes-confirmed']
765                 for confirmation in scch.replace(' ', '').split(','):
766                     if '=' in confirmation:
767                         stored_class, stored_copies = confirmation.split('=')[:2]
768                         classes_confirmed[stored_class] = int(stored_copies)
769             except (KeyError, ValueError):
770                 # Storage classes confirmed header missing or corrupt
771                 classes_confirmed = None
772
773             return result['body'].strip(), replicas_stored, classes_confirmed
774
775
776     def __init__(self, api_client=None, proxy=None,
777                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
778                  api_token=None, local_store=None, block_cache=None,
779                  num_retries=10, session=None, num_prefetch_threads=None):
780         """Initialize a new KeepClient.
781
782         Arguments:
783         :api_client:
784           The API client to use to find Keep services.  If not
785           provided, KeepClient will build one from available Arvados
786           configuration.
787
788         :proxy:
789           If specified, this KeepClient will send requests to this Keep
790           proxy.  Otherwise, KeepClient will fall back to the setting of the
791           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
792           If you want to KeepClient does not use a proxy, pass in an empty
793           string.
794
795         :timeout:
796           The initial timeout (in seconds) for HTTP requests to Keep
797           non-proxy servers.  A tuple of three floats is interpreted as
798           (connection_timeout, read_timeout, minimum_bandwidth). A connection
799           will be aborted if the average traffic rate falls below
800           minimum_bandwidth bytes per second over an interval of read_timeout
801           seconds. Because timeouts are often a result of transient server
802           load, the actual connection timeout will be increased by a factor
803           of two on each retry.
804           Default: (2, 256, 32768).
805
806         :proxy_timeout:
807           The initial timeout (in seconds) for HTTP requests to
808           Keep proxies. A tuple of three floats is interpreted as
809           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
810           described above for adjusting connection timeouts on retry also
811           applies.
812           Default: (20, 256, 32768).
813
814         :api_token:
815           If you're not using an API client, but only talking
816           directly to a Keep proxy, this parameter specifies an API token
817           to authenticate Keep requests.  It is an error to specify both
818           api_client and api_token.  If you specify neither, KeepClient
819           will use one available from the Arvados configuration.
820
821         :local_store:
822           If specified, this KeepClient will bypass Keep
823           services, and save data to the named directory.  If unspecified,
824           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
825           environment variable.  If you want to ensure KeepClient does not
826           use local storage, pass in an empty string.  This is primarily
827           intended to mock a server for testing.
828
829         :num_retries:
830           The default number of times to retry failed requests.
831           This will be used as the default num_retries value when get() and
832           put() are called.  Default 10.
833         """
834         self.lock = threading.Lock()
835         if proxy is None:
836             if config.get('ARVADOS_KEEP_SERVICES'):
837                 proxy = config.get('ARVADOS_KEEP_SERVICES')
838             else:
839                 proxy = config.get('ARVADOS_KEEP_PROXY')
840         if api_token is None:
841             if api_client is None:
842                 api_token = config.get('ARVADOS_API_TOKEN')
843             else:
844                 api_token = api_client.api_token
845         elif api_client is not None:
846             raise ValueError(
847                 "can't build KeepClient with both API client and token")
848         if local_store is None:
849             local_store = os.environ.get('KEEP_LOCAL_STORE')
850
851         if api_client is None:
852             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
853         else:
854             self.insecure = api_client.insecure
855
856         self.block_cache = block_cache if block_cache else KeepBlockCache()
857         self.timeout = timeout
858         self.proxy_timeout = proxy_timeout
859         self._user_agent_pool = queue.LifoQueue()
860         self.upload_counter = Counter()
861         self.download_counter = Counter()
862         self.put_counter = Counter()
863         self.get_counter = Counter()
864         self.hits_counter = Counter()
865         self.misses_counter = Counter()
866         self._storage_classes_unsupported_warning = False
867         self._default_classes = []
868         if num_prefetch_threads is not None:
869             self.num_prefetch_threads = num_prefetch_threads
870         else:
871             self.num_prefetch_threads = 2
872         self._prefetch_queue = None
873         self._prefetch_threads = None
874
875         if local_store:
876             self.local_store = local_store
877             self.head = self.local_store_head
878             self.get = self.local_store_get
879             self.put = self.local_store_put
880         else:
881             self.num_retries = num_retries
882             self.max_replicas_per_service = None
883             if proxy:
884                 proxy_uris = proxy.split()
885                 for i in range(len(proxy_uris)):
886                     if not proxy_uris[i].endswith('/'):
887                         proxy_uris[i] += '/'
888                     # URL validation
889                     url = urllib.parse.urlparse(proxy_uris[i])
890                     if not (url.scheme and url.netloc):
891                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
892                 self.api_token = api_token
893                 self._gateway_services = {}
894                 self._keep_services = [{
895                     'uuid': "00000-bi6l4-%015d" % idx,
896                     'service_type': 'proxy',
897                     '_service_root': uri,
898                     } for idx, uri in enumerate(proxy_uris)]
899                 self._writable_services = self._keep_services
900                 self.using_proxy = True
901                 self._static_services_list = True
902             else:
903                 # It's important to avoid instantiating an API client
904                 # unless we actually need one, for testing's sake.
905                 if api_client is None:
906                     api_client = arvados.api('v1')
907                 self.api_client = api_client
908                 self.api_token = api_client.api_token
909                 self._gateway_services = {}
910                 self._keep_services = None
911                 self._writable_services = None
912                 self.using_proxy = None
913                 self._static_services_list = False
914                 try:
915                     self._default_classes = [
916                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
917                 except KeyError:
918                     # We're talking to an old cluster
919                     pass
920
921     def current_timeout(self, attempt_number):
922         """Return the appropriate timeout to use for this client.
923
924         The proxy timeout setting if the backend service is currently a proxy,
925         the regular timeout setting otherwise.  The `attempt_number` indicates
926         how many times the operation has been tried already (starting from 0
927         for the first try), and scales the connection timeout portion of the
928         return value accordingly.
929
930         """
931         # TODO(twp): the timeout should be a property of a
932         # KeepService, not a KeepClient. See #4488.
933         t = self.proxy_timeout if self.using_proxy else self.timeout
934         if len(t) == 2:
935             return (t[0] * (1 << attempt_number), t[1])
936         else:
937             return (t[0] * (1 << attempt_number), t[1], t[2])
938     def _any_nondisk_services(self, service_list):
939         return any(ks.get('service_type', 'disk') != 'disk'
940                    for ks in service_list)
941
942     def build_services_list(self, force_rebuild=False):
943         if (self._static_services_list or
944               (self._keep_services and not force_rebuild)):
945             return
946         with self.lock:
947             try:
948                 keep_services = self.api_client.keep_services().accessible()
949             except Exception:  # API server predates Keep services.
950                 keep_services = self.api_client.keep_disks().list()
951
952             # Gateway services are only used when specified by UUID,
953             # so there's nothing to gain by filtering them by
954             # service_type.
955             self._gateway_services = {ks['uuid']: ks for ks in
956                                       keep_services.execute()['items']}
957             if not self._gateway_services:
958                 raise arvados.errors.NoKeepServersError()
959
960             # Precompute the base URI for each service.
961             for r in self._gateway_services.values():
962                 host = r['service_host']
963                 if not host.startswith('[') and host.find(':') >= 0:
964                     # IPv6 URIs must be formatted like http://[::1]:80/...
965                     host = '[' + host + ']'
966                 r['_service_root'] = "{}://{}:{:d}/".format(
967                     'https' if r['service_ssl_flag'] else 'http',
968                     host,
969                     r['service_port'])
970
971             _logger.debug(str(self._gateway_services))
972             self._keep_services = [
973                 ks for ks in self._gateway_services.values()
974                 if not ks.get('service_type', '').startswith('gateway:')]
975             self._writable_services = [ks for ks in self._keep_services
976                                        if not ks.get('read_only')]
977
978             # For disk type services, max_replicas_per_service is 1
979             # It is unknown (unlimited) for other service types.
980             if self._any_nondisk_services(self._writable_services):
981                 self.max_replicas_per_service = None
982             else:
983                 self.max_replicas_per_service = 1
984
985     def _service_weight(self, data_hash, service_uuid):
986         """Compute the weight of a Keep service endpoint for a data
987         block with a known hash.
988
989         The weight is md5(h + u) where u is the last 15 characters of
990         the service endpoint's UUID.
991         """
992         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
993
994     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
995         """Return an array of Keep service endpoints, in the order in
996         which they should be probed when reading or writing data with
997         the given hash+hints.
998         """
999         self.build_services_list(force_rebuild)
1000
1001         sorted_roots = []
1002         # Use the services indicated by the given +K@... remote
1003         # service hints, if any are present and can be resolved to a
1004         # URI.
1005         for hint in locator.hints:
1006             if hint.startswith('K@'):
1007                 if len(hint) == 7:
1008                     sorted_roots.append(
1009                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1010                 elif len(hint) == 29:
1011                     svc = self._gateway_services.get(hint[2:])
1012                     if svc:
1013                         sorted_roots.append(svc['_service_root'])
1014
1015         # Sort the available local services by weight (heaviest first)
1016         # for this locator, and return their service_roots (base URIs)
1017         # in that order.
1018         use_services = self._keep_services
1019         if need_writable:
1020             use_services = self._writable_services
1021         self.using_proxy = self._any_nondisk_services(use_services)
1022         sorted_roots.extend([
1023             svc['_service_root'] for svc in sorted(
1024                 use_services,
1025                 reverse=True,
1026                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1027         _logger.debug("{}: {}".format(locator, sorted_roots))
1028         return sorted_roots
1029
1030     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1031         # roots_map is a dictionary, mapping Keep service root strings
1032         # to KeepService objects.  Poll for Keep services, and add any
1033         # new ones to roots_map.  Return the current list of local
1034         # root strings.
1035         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1036         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1037         for root in local_roots:
1038             if root not in roots_map:
1039                 roots_map[root] = self.KeepService(
1040                     root, self._user_agent_pool,
1041                     upload_counter=self.upload_counter,
1042                     download_counter=self.download_counter,
1043                     headers=headers,
1044                     insecure=self.insecure)
1045         return local_roots
1046
1047     @staticmethod
1048     def _check_loop_result(result):
1049         # KeepClient RetryLoops should save results as a 2-tuple: the
1050         # actual result of the request, and the number of servers available
1051         # to receive the request this round.
1052         # This method returns True if there's a real result, False if
1053         # there are no more servers available, otherwise None.
1054         if isinstance(result, Exception):
1055             return None
1056         result, tried_server_count = result
1057         if (result is not None) and (result is not False):
1058             return True
1059         elif tried_server_count < 1:
1060             _logger.info("No more Keep services to try; giving up")
1061             return False
1062         else:
1063             return None
1064
1065     def get_from_cache(self, loc_s):
1066         """Fetch a block only if is in the cache, otherwise return None."""
1067         locator = KeepLocator(loc_s)
1068         slot = self.block_cache.get(locator.md5sum)
1069         if slot is not None and slot.ready.is_set():
1070             return slot.get()
1071         else:
1072             return None
1073
1074     def refresh_signature(self, loc):
1075         """Ask Keep to get the remote block and return its local signature"""
1076         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1077         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1078
1079     @retry.retry_method
1080     def head(self, loc_s, **kwargs):
1081         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1082
1083     @retry.retry_method
1084     def get(self, loc_s, **kwargs):
1085         return self._get_or_head(loc_s, method="GET", **kwargs)
1086
1087     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1088         """Get data from Keep.
1089
1090         This method fetches one or more blocks of data from Keep.  It
1091         sends a request each Keep service registered with the API
1092         server (or the proxy provided when this client was
1093         instantiated), then each service named in location hints, in
1094         sequence.  As soon as one service provides the data, it's
1095         returned.
1096
1097         Arguments:
1098         * loc_s: A string of one or more comma-separated locators to fetch.
1099           This method returns the concatenation of these blocks.
1100         * num_retries: The number of times to retry GET requests to
1101           *each* Keep server if it returns temporary failures, with
1102           exponential backoff.  Note that, in each loop, the method may try
1103           to fetch data from every available Keep service, along with any
1104           that are named in location hints in the locator.  The default value
1105           is set when the KeepClient is initialized.
1106         """
1107         if ',' in loc_s:
1108             return ''.join(self.get(x) for x in loc_s.split(','))
1109
1110         self.get_counter.add(1)
1111
1112         request_id = (request_id or
1113                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1114                       arvados.util.new_request_id())
1115         if headers is None:
1116             headers = {}
1117         headers['X-Request-Id'] = request_id
1118
1119         slot = None
1120         blob = None
1121         try:
1122             locator = KeepLocator(loc_s)
1123             if method == "GET":
1124                 while slot is None:
1125                     slot, first = self.block_cache.reserve_cache(locator.md5sum)
1126                     if first:
1127                         # Fresh and empty "first time it is used" slot
1128                         break
1129                     if prefetch:
1130                         # this is request for a prefetch to fill in
1131                         # the cache, don't need to wait for the
1132                         # result, so if it is already in flight return
1133                         # immediately.  Clear 'slot' to prevent
1134                         # finally block from calling slot.set()
1135                         if slot.ready.is_set():
1136                             slot.get()
1137                         slot = None
1138                         return None
1139
1140                     blob = slot.get()
1141                     if blob is not None:
1142                         self.hits_counter.add(1)
1143                         return blob
1144
1145                     # If blob is None, this means either
1146                     #
1147                     # (a) another thread was fetching this block and
1148                     # failed with an error or
1149                     #
1150                     # (b) cache thrashing caused the slot to be
1151                     # evicted (content set to None) by another thread
1152                     # between the call to reserve_cache() and get().
1153                     #
1154                     # We'll handle these cases by reserving a new slot
1155                     # and then doing a full GET request.
1156                     slot = None
1157
1158             self.misses_counter.add(1)
1159
1160             # If the locator has hints specifying a prefix (indicating a
1161             # remote keepproxy) or the UUID of a local gateway service,
1162             # read data from the indicated service(s) instead of the usual
1163             # list of local disk services.
1164             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1165                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1166             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1167                                for hint in locator.hints if (
1168                                        hint.startswith('K@') and
1169                                        len(hint) == 29 and
1170                                        self._gateway_services.get(hint[2:])
1171                                        )])
1172             # Map root URLs to their KeepService objects.
1173             roots_map = {
1174                 root: self.KeepService(root, self._user_agent_pool,
1175                                        upload_counter=self.upload_counter,
1176                                        download_counter=self.download_counter,
1177                                        headers=headers,
1178                                        insecure=self.insecure)
1179                 for root in hint_roots
1180             }
1181
1182             # See #3147 for a discussion of the loop implementation.  Highlights:
1183             # * Refresh the list of Keep services after each failure, in case
1184             #   it's being updated.
1185             # * Retry until we succeed, we're out of retries, or every available
1186             #   service has returned permanent failure.
1187             sorted_roots = []
1188             roots_map = {}
1189             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1190                                    backoff_start=2)
1191             for tries_left in loop:
1192                 try:
1193                     sorted_roots = self.map_new_services(
1194                         roots_map, locator,
1195                         force_rebuild=(tries_left < num_retries),
1196                         need_writable=False,
1197                         headers=headers)
1198                 except Exception as error:
1199                     loop.save_result(error)
1200                     continue
1201
1202                 # Query KeepService objects that haven't returned
1203                 # permanent failure, in our specified shuffle order.
1204                 services_to_try = [roots_map[root]
1205                                    for root in sorted_roots
1206                                    if roots_map[root].usable()]
1207                 for keep_service in services_to_try:
1208                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1209                     if blob is not None:
1210                         break
1211                 loop.save_result((blob, len(services_to_try)))
1212
1213             # Always cache the result, then return it if we succeeded.
1214             if loop.success():
1215                 return blob
1216         finally:
1217             if slot is not None:
1218                 self.block_cache.set(slot, blob)
1219
1220         # Q: Including 403 is necessary for the Keep tests to continue
1221         # passing, but maybe they should expect KeepReadError instead?
1222         not_founds = sum(1 for key in sorted_roots
1223                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1224         service_errors = ((key, roots_map[key].last_result()['error'])
1225                           for key in sorted_roots)
1226         if not roots_map:
1227             raise arvados.errors.KeepReadError(
1228                 "[{}] failed to read {}: no Keep services available ({})".format(
1229                     request_id, loc_s, loop.last_result()))
1230         elif not_founds == len(sorted_roots):
1231             raise arvados.errors.NotFoundError(
1232                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1233         else:
1234             raise arvados.errors.KeepReadError(
1235                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1236
1237     @retry.retry_method
1238     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1239         """Save data in Keep.
1240
1241         This method will get a list of Keep services from the API server, and
1242         send the data to each one simultaneously in a new thread.  Once the
1243         uploads are finished, if enough copies are saved, this method returns
1244         the most recent HTTP response body.  If requests fail to upload
1245         enough copies, this method raises KeepWriteError.
1246
1247         Arguments:
1248         * data: The string of data to upload.
1249         * copies: The number of copies that the user requires be saved.
1250           Default 2.
1251         * num_retries: The number of times to retry PUT requests to
1252           *each* Keep server if it returns temporary failures, with
1253           exponential backoff.  The default value is set when the
1254           KeepClient is initialized.
1255         * classes: An optional list of storage class names where copies should
1256           be written.
1257         """
1258
1259         classes = classes or self._default_classes
1260
1261         if not isinstance(data, bytes):
1262             data = data.encode()
1263
1264         self.put_counter.add(1)
1265
1266         data_hash = hashlib.md5(data).hexdigest()
1267         loc_s = data_hash + '+' + str(len(data))
1268         if copies < 1:
1269             return loc_s
1270         locator = KeepLocator(loc_s)
1271
1272         request_id = (request_id or
1273                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1274                       arvados.util.new_request_id())
1275         headers = {
1276             'X-Request-Id': request_id,
1277             'X-Keep-Desired-Replicas': str(copies),
1278         }
1279         roots_map = {}
1280         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1281                                backoff_start=2)
1282         done_copies = 0
1283         done_classes = []
1284         for tries_left in loop:
1285             try:
1286                 sorted_roots = self.map_new_services(
1287                     roots_map, locator,
1288                     force_rebuild=(tries_left < num_retries),
1289                     need_writable=True,
1290                     headers=headers)
1291             except Exception as error:
1292                 loop.save_result(error)
1293                 continue
1294
1295             pending_classes = []
1296             if done_classes is not None:
1297                 pending_classes = list(set(classes) - set(done_classes))
1298             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1299                                                         data_hash=data_hash,
1300                                                         copies=copies - done_copies,
1301                                                         max_service_replicas=self.max_replicas_per_service,
1302                                                         timeout=self.current_timeout(num_retries - tries_left),
1303                                                         classes=pending_classes)
1304             for service_root, ks in [(root, roots_map[root])
1305                                      for root in sorted_roots]:
1306                 if ks.finished():
1307                     continue
1308                 writer_pool.add_task(ks, service_root)
1309             writer_pool.join()
1310             pool_copies, pool_classes = writer_pool.done()
1311             done_copies += pool_copies
1312             if (done_classes is not None) and (pool_classes is not None):
1313                 done_classes += pool_classes
1314                 loop.save_result(
1315                     (done_copies >= copies and set(done_classes) == set(classes),
1316                     writer_pool.total_task_nr))
1317             else:
1318                 # Old keepstore contacted without storage classes support:
1319                 # success is determined only by successful copies.
1320                 #
1321                 # Disable storage classes tracking from this point forward.
1322                 if not self._storage_classes_unsupported_warning:
1323                     self._storage_classes_unsupported_warning = True
1324                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1325                 done_classes = None
1326                 loop.save_result(
1327                     (done_copies >= copies, writer_pool.total_task_nr))
1328
1329         if loop.success():
1330             return writer_pool.response()
1331         if not roots_map:
1332             raise arvados.errors.KeepWriteError(
1333                 "[{}] failed to write {}: no Keep services available ({})".format(
1334                     request_id, data_hash, loop.last_result()))
1335         else:
1336             service_errors = ((key, roots_map[key].last_result()['error'])
1337                               for key in sorted_roots
1338                               if roots_map[key].last_result()['error'])
1339             raise arvados.errors.KeepWriteError(
1340                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1341                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1342
1343     def _block_prefetch_worker(self):
1344         """The background downloader thread."""
1345         while True:
1346             try:
1347                 b = self._prefetch_queue.get()
1348                 if b is None:
1349                     return
1350                 self.get(b, prefetch=True)
1351             except Exception:
1352                 _logger.exception("Exception doing block prefetch")
1353
1354     def _start_prefetch_threads(self):
1355         if self._prefetch_threads is None:
1356             with self.lock:
1357                 if self._prefetch_threads is not None:
1358                     return
1359                 self._prefetch_queue = queue.Queue()
1360                 self._prefetch_threads = []
1361                 for i in range(0, self.num_prefetch_threads):
1362                     thread = threading.Thread(target=self._block_prefetch_worker)
1363                     self._prefetch_threads.append(thread)
1364                     thread.daemon = True
1365                     thread.start()
1366
1367     def block_prefetch(self, locator):
1368         """
1369         This relies on the fact that KeepClient implements a block cache,
1370         so repeated requests for the same block will not result in repeated
1371         downloads (unless the block is evicted from the cache.)  This method
1372         does not block.
1373         """
1374
1375         if self.block_cache.get(locator) is not None:
1376             return
1377
1378         self._start_prefetch_threads()
1379         self._prefetch_queue.put(locator)
1380
1381     def stop_prefetch_threads(self):
1382         with self.lock:
1383             if self._prefetch_threads is not None:
1384                 for t in self._prefetch_threads:
1385                     self._prefetch_queue.put(None)
1386                 for t in self._prefetch_threads:
1387                     t.join()
1388             self._prefetch_threads = None
1389             self._prefetch_queue = None
1390
1391     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1392         """A stub for put().
1393
1394         This method is used in place of the real put() method when
1395         using local storage (see constructor's local_store argument).
1396
1397         copies and num_retries arguments are ignored: they are here
1398         only for the sake of offering the same call signature as
1399         put().
1400
1401         Data stored this way can be retrieved via local_store_get().
1402         """
1403         md5 = hashlib.md5(data).hexdigest()
1404         locator = '%s+%d' % (md5, len(data))
1405         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1406             f.write(data)
1407         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1408                   os.path.join(self.local_store, md5))
1409         return locator
1410
1411     def local_store_get(self, loc_s, num_retries=None):
1412         """Companion to local_store_put()."""
1413         try:
1414             locator = KeepLocator(loc_s)
1415         except ValueError:
1416             raise arvados.errors.NotFoundError(
1417                 "Invalid data locator: '%s'" % loc_s)
1418         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1419             return b''
1420         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1421             return f.read()
1422
1423     def local_store_head(self, loc_s, num_retries=None):
1424         """Companion to local_store_put()."""
1425         try:
1426             locator = KeepLocator(loc_s)
1427         except ValueError:
1428             raise arvados.errors.NotFoundError(
1429                 "Invalid data locator: '%s'" % loc_s)
1430         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1431             return True
1432         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1433             return True