Merge branch '21773-keep-service-discovery'
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import copy
6 import collections
7 import datetime
8 import hashlib
9 import errno
10 import io
11 import logging
12 import math
13 import os
14 import pycurl
15 import queue
16 import re
17 import socket
18 import ssl
19 import sys
20 import threading
21 import resource
22 import urllib.parse
23 import traceback
24 import weakref
25
26 from io import BytesIO
27
28 import arvados
29 import arvados.config as config
30 import arvados.errors
31 import arvados.retry as retry
32 import arvados.util
33
34 from ._internal import basedirs, diskcache, Timer
35 from ._internal.pycurl import PyCurlHelper
36
37 _logger = logging.getLogger('arvados.keep')
38 global_client_object = None
39
40 # Monkey patch TCP constants when not available (apple). Values sourced from:
41 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
42 if sys.platform == 'darwin':
43     if not hasattr(socket, 'TCP_KEEPALIVE'):
44         socket.TCP_KEEPALIVE = 0x010
45     if not hasattr(socket, 'TCP_KEEPINTVL'):
46         socket.TCP_KEEPINTVL = 0x101
47     if not hasattr(socket, 'TCP_KEEPCNT'):
48         socket.TCP_KEEPCNT = 0x102
49
50 class KeepLocator(object):
51     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
52     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
53
54     def __init__(self, locator_str):
55         self.hints = []
56         self._perm_sig = None
57         self._perm_expiry = None
58         pieces = iter(locator_str.split('+'))
59         self.md5sum = next(pieces)
60         try:
61             self.size = int(next(pieces))
62         except StopIteration:
63             self.size = None
64         for hint in pieces:
65             if self.HINT_RE.match(hint) is None:
66                 raise ValueError("invalid hint format: {}".format(hint))
67             elif hint.startswith('A'):
68                 self.parse_permission_hint(hint)
69             else:
70                 self.hints.append(hint)
71
72     def __str__(self):
73         return '+'.join(
74             str(s)
75             for s in [self.md5sum, self.size,
76                       self.permission_hint()] + self.hints
77             if s is not None)
78
79     def stripped(self):
80         if self.size is not None:
81             return "%s+%i" % (self.md5sum, self.size)
82         else:
83             return self.md5sum
84
85     def _make_hex_prop(name, length):
86         # Build and return a new property with the given name that
87         # must be a hex string of the given length.
88         data_name = '_{}'.format(name)
89         def getter(self):
90             return getattr(self, data_name)
91         def setter(self, hex_str):
92             if not arvados.util.is_hex(hex_str, length):
93                 raise ValueError("{} is not a {}-digit hex string: {!r}".
94                                  format(name, length, hex_str))
95             setattr(self, data_name, hex_str)
96         return property(getter, setter)
97
98     md5sum = _make_hex_prop('md5sum', 32)
99     perm_sig = _make_hex_prop('perm_sig', 40)
100
101     @property
102     def perm_expiry(self):
103         return self._perm_expiry
104
105     @perm_expiry.setter
106     def perm_expiry(self, value):
107         if not arvados.util.is_hex(value, 1, 8):
108             raise ValueError(
109                 "permission timestamp must be a hex Unix timestamp: {}".
110                 format(value))
111         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
112
113     def permission_hint(self):
114         data = [self.perm_sig, self.perm_expiry]
115         if None in data:
116             return None
117         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
118         return "A{}@{:08x}".format(*data)
119
120     def parse_permission_hint(self, s):
121         try:
122             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
123         except IndexError:
124             raise ValueError("bad permission hint {}".format(s))
125
126     def permission_expired(self, as_of_dt=None):
127         if self.perm_expiry is None:
128             return False
129         elif as_of_dt is None:
130             as_of_dt = datetime.datetime.now()
131         return self.perm_expiry <= as_of_dt
132
133
134 class KeepBlockCache(object):
135     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
136         self.cache_max = cache_max
137         self._cache = collections.OrderedDict()
138         self._cache_lock = threading.Lock()
139         self._max_slots = max_slots
140         self._disk_cache = disk_cache
141         self._disk_cache_dir = disk_cache_dir
142         self._cache_updating = threading.Condition(self._cache_lock)
143
144         if self._disk_cache and self._disk_cache_dir is None:
145             self._disk_cache_dir = str(basedirs.BaseDirectories('CACHE').storage_path('keep'))
146
147         if self._max_slots == 0:
148             if self._disk_cache:
149                 # Each block uses two file descriptors, one used to
150                 # open it initially and hold the flock(), and a second
151                 # hidden one used by mmap().
152                 #
153                 # Set max slots to 1/8 of maximum file handles.  This
154                 # means we'll use at most 1/4 of total file handles.
155                 #
156                 # NOFILE typically defaults to 1024 on Linux so this
157                 # is 128 slots (256 file handles), which means we can
158                 # cache up to 8 GiB of 64 MiB blocks.  This leaves
159                 # 768 file handles for sockets and other stuff.
160                 #
161                 # When we want the ability to have more cache (e.g. in
162                 # arv-mount) we'll increase rlimit before calling
163                 # this.
164                 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
165             else:
166                 # RAM cache slots
167                 self._max_slots = 512
168
169         if self.cache_max == 0:
170             if self._disk_cache:
171                 fs = os.statvfs(self._disk_cache_dir)
172                 # Calculation of available space incorporates existing cache usage
173                 existing_usage = diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
174                 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
175                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
176                 # pick smallest of:
177                 # 10% of total disk size
178                 # 25% of available space
179                 # max_slots * 64 MiB
180                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
181             else:
182                 # 256 MiB in RAM
183                 self.cache_max = (256 * 1024 * 1024)
184
185         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
186
187         self.cache_total = 0
188         if self._disk_cache:
189             self._cache = diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
190             for slot in self._cache.values():
191                 self.cache_total += slot.size()
192             self.cap_cache()
193
194     class _CacheSlot:
195         __slots__ = ("locator", "ready", "content")
196
197         def __init__(self, locator):
198             self.locator = locator
199             self.ready = threading.Event()
200             self.content = None
201
202         def get(self):
203             self.ready.wait()
204             return self.content
205
206         def set(self, value):
207             if self.content is not None:
208                 return False
209             self.content = value
210             self.ready.set()
211             return True
212
213         def size(self):
214             if self.content is None:
215                 return 0
216             else:
217                 return len(self.content)
218
219         def evict(self):
220             self.content = None
221
222
223     def _resize_cache(self, cache_max, max_slots):
224         # Try and make sure the contents of the cache do not exceed
225         # the supplied maximums.
226
227         if self.cache_total <= cache_max and len(self._cache) <= max_slots:
228             return
229
230         _evict_candidates = collections.deque(self._cache.values())
231         while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
232             slot = _evict_candidates.popleft()
233             if not slot.ready.is_set():
234                 continue
235
236             sz = slot.size()
237             slot.evict()
238             self.cache_total -= sz
239             del self._cache[slot.locator]
240
241
242     def cap_cache(self):
243         '''Cap the cache size to self.cache_max'''
244         with self._cache_updating:
245             self._resize_cache(self.cache_max, self._max_slots)
246             self._cache_updating.notify_all()
247
248     def _get(self, locator):
249         # Test if the locator is already in the cache
250         if locator in self._cache:
251             n = self._cache[locator]
252             if n.ready.is_set() and n.content is None:
253                 del self._cache[n.locator]
254                 return None
255             self._cache.move_to_end(locator)
256             return n
257         if self._disk_cache:
258             # see if it exists on disk
259             n = diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
260             if n is not None:
261                 self._cache[n.locator] = n
262                 self.cache_total += n.size()
263                 return n
264         return None
265
266     def get(self, locator):
267         with self._cache_lock:
268             return self._get(locator)
269
270     def reserve_cache(self, locator):
271         '''Reserve a cache slot for the specified locator,
272         or return the existing slot.'''
273         with self._cache_updating:
274             n = self._get(locator)
275             if n:
276                 return n, False
277             else:
278                 # Add a new cache slot for the locator
279                 self._resize_cache(self.cache_max, self._max_slots-1)
280                 while len(self._cache) >= self._max_slots:
281                     # If there isn't a slot available, need to wait
282                     # for something to happen that releases one of the
283                     # cache slots.  Idle for 200 ms or woken up by
284                     # another thread
285                     self._cache_updating.wait(timeout=0.2)
286                     self._resize_cache(self.cache_max, self._max_slots-1)
287
288                 if self._disk_cache:
289                     n = diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
290                 else:
291                     n = KeepBlockCache._CacheSlot(locator)
292                 self._cache[n.locator] = n
293                 return n, True
294
295     def set(self, slot, blob):
296         try:
297             if slot.set(blob):
298                 self.cache_total += slot.size()
299             return
300         except OSError as e:
301             if e.errno == errno.ENOMEM:
302                 # Reduce max slots to current - 4, cap cache and retry
303                 with self._cache_lock:
304                     self._max_slots = max(4, len(self._cache) - 4)
305             elif e.errno == errno.ENOSPC:
306                 # Reduce disk max space to current - 256 MiB, cap cache and retry
307                 with self._cache_lock:
308                     sm = sum(st.size() for st in self._cache.values())
309                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
310             elif e.errno == errno.ENODEV:
311                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
312         except Exception as e:
313             pass
314         finally:
315             # Check if we should evict things from the cache.  Either
316             # because we added a new thing or there was an error and
317             # we possibly adjusted the limits down, so we might need
318             # to push something out.
319             self.cap_cache()
320
321         try:
322             # Only gets here if there was an error the first time. The
323             # exception handler adjusts limits downward in some cases
324             # to free up resources, which would make the operation
325             # succeed.
326             if slot.set(blob):
327                 self.cache_total += slot.size()
328         except Exception as e:
329             # It failed again.  Give up.
330             slot.set(None)
331             raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
332
333         self.cap_cache()
334
335
336 class _Counter:
337     def __init__(self, v=0):
338         self._lk = threading.Lock()
339         self._val = v
340
341     def add(self, v):
342         with self._lk:
343             self._val += v
344
345     def get(self):
346         with self._lk:
347             return self._val
348
349
350 class KeepClient(object):
351     DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
352     DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
353
354     class _KeepService(PyCurlHelper):
355         """Make requests to a single Keep service, and track results.
356
357         A _KeepService is intended to last long enough to perform one
358         transaction (GET or PUT) against one Keep service. This can
359         involve calling either get() or put() multiple times in order
360         to retry after transient failures. However, calling both get()
361         and put() on a single instance -- or using the same instance
362         to access two different Keep services -- will not produce
363         sensible behavior.
364         """
365
366         HTTP_ERRORS = (
367             socket.error,
368             ssl.SSLError,
369             arvados.errors.HttpError,
370         )
371
372         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
373                      upload_counter=None,
374                      download_counter=None,
375                      headers={},
376                      insecure=False):
377             super().__init__()
378             self.root = root
379             self._user_agent_pool = user_agent_pool
380             self._result = {'error': None}
381             self._usable = True
382             self._session = None
383             self._socket = None
384             self.get_headers = {'Accept': 'application/octet-stream'}
385             self.get_headers.update(headers)
386             self.put_headers = headers
387             self.upload_counter = upload_counter
388             self.download_counter = download_counter
389             self.insecure = insecure
390
391         def usable(self):
392             """Is it worth attempting a request?"""
393             return self._usable
394
395         def finished(self):
396             """Did the request succeed or encounter permanent failure?"""
397             return self._result['error'] == False or not self._usable
398
399         def last_result(self):
400             return self._result
401
402         def _get_user_agent(self):
403             try:
404                 return self._user_agent_pool.get(block=False)
405             except queue.Empty:
406                 return pycurl.Curl()
407
408         def _put_user_agent(self, ua):
409             try:
410                 ua.reset()
411                 self._user_agent_pool.put(ua, block=False)
412             except:
413                 ua.close()
414
415         def get(self, locator, method="GET", timeout=None):
416             # locator is a KeepLocator object.
417             url = self.root + str(locator)
418             _logger.debug("Request: %s %s", method, url)
419             curl = self._get_user_agent()
420             ok = None
421             try:
422                 with Timer() as t:
423                     self._headers = {}
424                     response_body = BytesIO()
425                     curl.setopt(pycurl.NOSIGNAL, 1)
426                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
427                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
428                     curl.setopt(pycurl.URL, url.encode('utf-8'))
429                     curl.setopt(pycurl.HTTPHEADER, [
430                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
431                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
432                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
433                     if self.insecure:
434                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
435                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
436                     else:
437                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
438                     if method == "HEAD":
439                         curl.setopt(pycurl.NOBODY, True)
440                     else:
441                         curl.setopt(pycurl.HTTPGET, True)
442                     self._setcurltimeouts(curl, timeout, method=="HEAD")
443
444                     try:
445                         curl.perform()
446                     except Exception as e:
447                         raise arvados.errors.HttpError(0, str(e))
448                     finally:
449                         if self._socket:
450                             self._socket.close()
451                             self._socket = None
452                     self._result = {
453                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
454                         'body': response_body.getvalue(),
455                         'headers': self._headers,
456                         'error': False,
457                     }
458
459                 ok = retry.check_http_response_success(self._result['status_code'])
460                 if not ok:
461                     self._result['error'] = arvados.errors.HttpError(
462                         self._result['status_code'],
463                         self._headers.get('x-status-line', 'Error'))
464             except self.HTTP_ERRORS as e:
465                 self._result = {
466                     'error': e,
467                 }
468             self._usable = ok != False
469             if self._result.get('status_code', None):
470                 # The client worked well enough to get an HTTP status
471                 # code, so presumably any problems are just on the
472                 # server side and it's OK to reuse the client.
473                 self._put_user_agent(curl)
474             else:
475                 # Don't return this client to the pool, in case it's
476                 # broken.
477                 curl.close()
478             if not ok:
479                 _logger.debug("Request fail: GET %s => %s: %s",
480                               url, type(self._result['error']), str(self._result['error']))
481                 return None
482             if method == "HEAD":
483                 _logger.info("HEAD %s: %s bytes",
484                          self._result['status_code'],
485                          self._result.get('content-length'))
486                 if self._result['headers'].get('x-keep-locator'):
487                     # This is a response to a remote block copy request, return
488                     # the local copy block locator.
489                     return self._result['headers'].get('x-keep-locator')
490                 return True
491
492             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
493                          self._result['status_code'],
494                          len(self._result['body']),
495                          t.msecs,
496                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
497
498             if self.download_counter:
499                 self.download_counter.add(len(self._result['body']))
500             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
501             if resp_md5 != locator.md5sum:
502                 _logger.warning("Checksum fail: md5(%s) = %s",
503                                 url, resp_md5)
504                 self._result['error'] = arvados.errors.HttpError(
505                     0, 'Checksum fail')
506                 return None
507             return self._result['body']
508
509         def put(self, hash_s, body, timeout=None, headers={}):
510             put_headers = copy.copy(self.put_headers)
511             put_headers.update(headers)
512             url = self.root + hash_s
513             _logger.debug("Request: PUT %s", url)
514             curl = self._get_user_agent()
515             ok = None
516             try:
517                 with Timer() as t:
518                     self._headers = {}
519                     body_reader = BytesIO(body)
520                     response_body = BytesIO()
521                     curl.setopt(pycurl.NOSIGNAL, 1)
522                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
523                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
524                     curl.setopt(pycurl.URL, url.encode('utf-8'))
525                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
526                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
527                     # response) instead of sending the request body immediately.
528                     # This allows the server to reject the request if the request
529                     # is invalid or the server is read-only, without waiting for
530                     # the client to send the entire block.
531                     curl.setopt(pycurl.UPLOAD, True)
532                     curl.setopt(pycurl.INFILESIZE, len(body))
533                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
534                     curl.setopt(pycurl.HTTPHEADER, [
535                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
536                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
537                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
538                     if self.insecure:
539                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
540                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
541                     else:
542                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
543                     self._setcurltimeouts(curl, timeout)
544                     try:
545                         curl.perform()
546                     except Exception as e:
547                         raise arvados.errors.HttpError(0, str(e))
548                     finally:
549                         if self._socket:
550                             self._socket.close()
551                             self._socket = None
552                     self._result = {
553                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
554                         'body': response_body.getvalue().decode('utf-8'),
555                         'headers': self._headers,
556                         'error': False,
557                     }
558                 ok = retry.check_http_response_success(self._result['status_code'])
559                 if not ok:
560                     self._result['error'] = arvados.errors.HttpError(
561                         self._result['status_code'],
562                         self._headers.get('x-status-line', 'Error'))
563             except self.HTTP_ERRORS as e:
564                 self._result = {
565                     'error': e,
566                 }
567             self._usable = ok != False # still usable if ok is True or None
568             if self._result.get('status_code', None):
569                 # Client is functional. See comment in get().
570                 self._put_user_agent(curl)
571             else:
572                 curl.close()
573             if not ok:
574                 _logger.debug("Request fail: PUT %s => %s: %s",
575                               url, type(self._result['error']), str(self._result['error']))
576                 return False
577             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
578                          self._result['status_code'],
579                          len(body),
580                          t.msecs,
581                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
582             if self.upload_counter:
583                 self.upload_counter.add(len(body))
584             return True
585
586
587     class _KeepWriterQueue(queue.Queue):
588         def __init__(self, copies, classes=[]):
589             queue.Queue.__init__(self) # Old-style superclass
590             self.wanted_copies = copies
591             self.wanted_storage_classes = classes
592             self.successful_copies = 0
593             self.confirmed_storage_classes = {}
594             self.response = None
595             self.storage_classes_tracking = True
596             self.queue_data_lock = threading.RLock()
597             self.pending_tries = max(copies, len(classes))
598             self.pending_tries_notification = threading.Condition()
599
600         def write_success(self, response, replicas_nr, classes_confirmed):
601             with self.queue_data_lock:
602                 self.successful_copies += replicas_nr
603                 if classes_confirmed is None:
604                     self.storage_classes_tracking = False
605                 elif self.storage_classes_tracking:
606                     for st_class, st_copies in classes_confirmed.items():
607                         try:
608                             self.confirmed_storage_classes[st_class] += st_copies
609                         except KeyError:
610                             self.confirmed_storage_classes[st_class] = st_copies
611                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
612                 self.response = response
613             with self.pending_tries_notification:
614                 self.pending_tries_notification.notify_all()
615
616         def write_fail(self, ks):
617             with self.pending_tries_notification:
618                 self.pending_tries += 1
619                 self.pending_tries_notification.notify()
620
621         def pending_copies(self):
622             with self.queue_data_lock:
623                 return self.wanted_copies - self.successful_copies
624
625         def satisfied_classes(self):
626             with self.queue_data_lock:
627                 if not self.storage_classes_tracking:
628                     # Notifies disabled storage classes expectation to
629                     # the outer loop.
630                     return None
631             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
632
633         def pending_classes(self):
634             with self.queue_data_lock:
635                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
636                     return []
637                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
638                 for st_class, st_copies in self.confirmed_storage_classes.items():
639                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
640                         unsatisfied_classes.remove(st_class)
641                 return unsatisfied_classes
642
643         def get_next_task(self):
644             with self.pending_tries_notification:
645                 while True:
646                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
647                         # This notify_all() is unnecessary --
648                         # write_success() already called notify_all()
649                         # when pending<1 became true, so it's not
650                         # possible for any other thread to be in
651                         # wait() now -- but it's cheap insurance
652                         # against deadlock so we do it anyway:
653                         self.pending_tries_notification.notify_all()
654                         # Drain the queue and then raise Queue.Empty
655                         while True:
656                             self.get_nowait()
657                             self.task_done()
658                     elif self.pending_tries > 0:
659                         service, service_root = self.get_nowait()
660                         if service.finished():
661                             self.task_done()
662                             continue
663                         self.pending_tries -= 1
664                         return service, service_root
665                     elif self.empty():
666                         self.pending_tries_notification.notify_all()
667                         raise queue.Empty
668                     else:
669                         self.pending_tries_notification.wait()
670
671
672     class _KeepWriterThreadPool:
673         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
674             self.total_task_nr = 0
675             if (not max_service_replicas) or (max_service_replicas >= copies):
676                 num_threads = 1
677             else:
678                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
679             _logger.debug("Pool max threads is %d", num_threads)
680             self.workers = []
681             self.queue = KeepClient._KeepWriterQueue(copies, classes)
682             # Create workers
683             for _ in range(num_threads):
684                 w = KeepClient._KeepWriterThread(self.queue, data, data_hash, timeout)
685                 self.workers.append(w)
686
687         def add_task(self, ks, service_root):
688             self.queue.put((ks, service_root))
689             self.total_task_nr += 1
690
691         def done(self):
692             return self.queue.successful_copies, self.queue.satisfied_classes()
693
694         def join(self):
695             # Start workers
696             for worker in self.workers:
697                 worker.start()
698             # Wait for finished work
699             self.queue.join()
700
701         def response(self):
702             return self.queue.response
703
704
705     class _KeepWriterThread(threading.Thread):
706         class TaskFailed(RuntimeError):
707             """Exception for failed Keep writes
708
709             TODO: Move this class to the module top level and document it
710
711             @private
712             """
713
714
715         def __init__(self, queue, data, data_hash, timeout=None):
716             super().__init__()
717             self.timeout = timeout
718             self.queue = queue
719             self.data = data
720             self.data_hash = data_hash
721             self.daemon = True
722
723         def run(self):
724             while True:
725                 try:
726                     service, service_root = self.queue.get_next_task()
727                 except queue.Empty:
728                     return
729                 try:
730                     locator, copies, classes = self.do_task(service, service_root)
731                 except Exception as e:
732                     if not isinstance(e, self.TaskFailed):
733                         _logger.exception("Exception in _KeepWriterThread")
734                     self.queue.write_fail(service)
735                 else:
736                     self.queue.write_success(locator, copies, classes)
737                 finally:
738                     self.queue.task_done()
739
740         def do_task(self, service, service_root):
741             classes = self.queue.pending_classes()
742             headers = {}
743             if len(classes) > 0:
744                 classes.sort()
745                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
746             success = bool(service.put(self.data_hash,
747                                         self.data,
748                                         timeout=self.timeout,
749                                         headers=headers))
750             result = service.last_result()
751
752             if not success:
753                 if result.get('status_code'):
754                     _logger.debug("Request fail: PUT %s => %s %s",
755                                   self.data_hash,
756                                   result.get('status_code'),
757                                   result.get('body'))
758                 raise self.TaskFailed()
759
760             _logger.debug("_KeepWriterThread %s succeeded %s+%i %s",
761                           str(threading.current_thread()),
762                           self.data_hash,
763                           len(self.data),
764                           service_root)
765             try:
766                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
767             except (KeyError, ValueError):
768                 replicas_stored = 1
769
770             classes_confirmed = {}
771             try:
772                 scch = result['headers']['x-keep-storage-classes-confirmed']
773                 for confirmation in scch.replace(' ', '').split(','):
774                     if '=' in confirmation:
775                         stored_class, stored_copies = confirmation.split('=')[:2]
776                         classes_confirmed[stored_class] = int(stored_copies)
777             except (KeyError, ValueError):
778                 # Storage classes confirmed header missing or corrupt
779                 classes_confirmed = None
780
781             return result['body'].strip(), replicas_stored, classes_confirmed
782
783
784     def __init__(self, api_client=None, proxy=None,
785                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
786                  api_token=None, local_store=None, block_cache=None,
787                  num_retries=10, session=None, num_prefetch_threads=None):
788         """Initialize a new KeepClient.
789
790         Arguments:
791         :api_client:
792           The API client to use to find Keep services.  If not
793           provided, KeepClient will build one from available Arvados
794           configuration.
795
796         :proxy:
797           If specified, this KeepClient will send requests to this Keep
798           proxy.  Otherwise, KeepClient will fall back to the setting of the
799           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
800           If you want to KeepClient does not use a proxy, pass in an empty
801           string.
802
803         :timeout:
804           The initial timeout (in seconds) for HTTP requests to Keep
805           non-proxy servers.  A tuple of three floats is interpreted as
806           (connection_timeout, read_timeout, minimum_bandwidth). A connection
807           will be aborted if the average traffic rate falls below
808           minimum_bandwidth bytes per second over an interval of read_timeout
809           seconds. Because timeouts are often a result of transient server
810           load, the actual connection timeout will be increased by a factor
811           of two on each retry.
812           Default: (2, 256, 32768).
813
814         :proxy_timeout:
815           The initial timeout (in seconds) for HTTP requests to
816           Keep proxies. A tuple of three floats is interpreted as
817           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
818           described above for adjusting connection timeouts on retry also
819           applies.
820           Default: (20, 256, 32768).
821
822         :api_token:
823           If you're not using an API client, but only talking
824           directly to a Keep proxy, this parameter specifies an API token
825           to authenticate Keep requests.  It is an error to specify both
826           api_client and api_token.  If you specify neither, KeepClient
827           will use one available from the Arvados configuration.
828
829         :local_store:
830           If specified, this KeepClient will bypass Keep
831           services, and save data to the named directory.  If unspecified,
832           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
833           environment variable.  If you want to ensure KeepClient does not
834           use local storage, pass in an empty string.  This is primarily
835           intended to mock a server for testing.
836
837         :num_retries:
838           The default number of times to retry failed requests.
839           This will be used as the default num_retries value when get() and
840           put() are called.  Default 10.
841         """
842         self.lock = threading.Lock()
843         if proxy is None:
844             if config.get('ARVADOS_KEEP_SERVICES'):
845                 proxy = config.get('ARVADOS_KEEP_SERVICES')
846             else:
847                 proxy = config.get('ARVADOS_KEEP_PROXY')
848         if api_token is None:
849             if api_client is None:
850                 api_token = config.get('ARVADOS_API_TOKEN')
851             else:
852                 api_token = api_client.api_token
853         elif api_client is not None:
854             raise ValueError(
855                 "can't build KeepClient with both API client and token")
856         if local_store is None:
857             local_store = os.environ.get('KEEP_LOCAL_STORE')
858
859         if api_client is None:
860             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
861         else:
862             self.insecure = api_client.insecure
863
864         self.block_cache = block_cache if block_cache else KeepBlockCache()
865         self.timeout = timeout
866         self.proxy_timeout = proxy_timeout
867         self._user_agent_pool = queue.LifoQueue()
868         self.upload_counter = _Counter()
869         self.download_counter = _Counter()
870         self.put_counter = _Counter()
871         self.get_counter = _Counter()
872         self.hits_counter = _Counter()
873         self.misses_counter = _Counter()
874         self._storage_classes_unsupported_warning = False
875         self._default_classes = []
876         if num_prefetch_threads is not None:
877             self.num_prefetch_threads = num_prefetch_threads
878         else:
879             self.num_prefetch_threads = 2
880         self._prefetch_queue = None
881         self._prefetch_threads = None
882
883         if local_store:
884             self.local_store = local_store
885             self.head = self.local_store_head
886             self.get = self.local_store_get
887             self.put = self.local_store_put
888         else:
889             self.num_retries = num_retries
890             self.max_replicas_per_service = None
891             if proxy:
892                 proxy_uris = proxy.split()
893                 for i in range(len(proxy_uris)):
894                     if not proxy_uris[i].endswith('/'):
895                         proxy_uris[i] += '/'
896                     # URL validation
897                     url = urllib.parse.urlparse(proxy_uris[i])
898                     if not (url.scheme and url.netloc):
899                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
900                 self.api_token = api_token
901                 self._gateway_services = {}
902                 self._keep_services = [{
903                     'uuid': "00000-bi6l4-%015d" % idx,
904                     'service_type': 'proxy',
905                     '_service_root': uri,
906                     } for idx, uri in enumerate(proxy_uris)]
907                 self._writable_services = self._keep_services
908                 self.using_proxy = True
909                 self._static_services_list = True
910             else:
911                 # It's important to avoid instantiating an API client
912                 # unless we actually need one, for testing's sake.
913                 if api_client is None:
914                     api_client = arvados.api('v1')
915                 self.api_client = api_client
916                 self.api_token = api_client.api_token
917                 self._gateway_services = {}
918                 self._keep_services = None
919                 self._writable_services = None
920                 self.using_proxy = None
921                 self._static_services_list = False
922                 try:
923                     self._default_classes = [
924                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
925                 except KeyError:
926                     # We're talking to an old cluster
927                     pass
928
929     def current_timeout(self, attempt_number):
930         """Return the appropriate timeout to use for this client.
931
932         The proxy timeout setting if the backend service is currently a proxy,
933         the regular timeout setting otherwise.  The `attempt_number` indicates
934         how many times the operation has been tried already (starting from 0
935         for the first try), and scales the connection timeout portion of the
936         return value accordingly.
937
938         """
939         # TODO(twp): the timeout should be a property of a
940         # _KeepService, not a KeepClient. See #4488.
941         t = self.proxy_timeout if self.using_proxy else self.timeout
942         if len(t) == 2:
943             return (t[0] * (1 << attempt_number), t[1])
944         else:
945             return (t[0] * (1 << attempt_number), t[1], t[2])
946     def _any_nondisk_services(self, service_list):
947         return any(ks.get('service_type', 'disk') != 'disk'
948                    for ks in service_list)
949
950     def build_services_list(self, force_rebuild=False):
951         if (self._static_services_list or
952               (self._keep_services and not force_rebuild)):
953             return
954         with self.lock:
955             try:
956                 keep_services = self.api_client.keep_services().accessible()
957             except Exception:  # API server predates Keep services.
958                 keep_services = self.api_client.keep_disks().list()
959
960             # Gateway services are only used when specified by UUID,
961             # so there's nothing to gain by filtering them by
962             # service_type.
963             self._gateway_services = {ks['uuid']: ks for ks in
964                                       keep_services.execute()['items']}
965             if not self._gateway_services:
966                 raise arvados.errors.NoKeepServersError()
967
968             # Precompute the base URI for each service.
969             for r in self._gateway_services.values():
970                 host = r['service_host']
971                 if not host.startswith('[') and host.find(':') >= 0:
972                     # IPv6 URIs must be formatted like http://[::1]:80/...
973                     host = '[' + host + ']'
974                 r['_service_root'] = "{}://{}:{:d}/".format(
975                     'https' if r['service_ssl_flag'] else 'http',
976                     host,
977                     r['service_port'])
978
979             _logger.debug(str(self._gateway_services))
980             self._keep_services = [
981                 ks for ks in self._gateway_services.values()
982                 if not ks.get('service_type', '').startswith('gateway:')]
983             self._writable_services = [ks for ks in self._keep_services
984                                        if not ks.get('read_only')]
985
986             # For disk type services, max_replicas_per_service is 1
987             # It is unknown (unlimited) for other service types.
988             if self._any_nondisk_services(self._writable_services):
989                 self.max_replicas_per_service = None
990             else:
991                 self.max_replicas_per_service = 1
992
993     def _service_weight(self, data_hash, service_uuid):
994         """Compute the weight of a Keep service endpoint for a data
995         block with a known hash.
996
997         The weight is md5(h + u) where u is the last 15 characters of
998         the service endpoint's UUID.
999         """
1000         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1001
1002     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1003         """Return an array of Keep service endpoints, in the order in
1004         which they should be probed when reading or writing data with
1005         the given hash+hints.
1006         """
1007         self.build_services_list(force_rebuild)
1008
1009         sorted_roots = []
1010         # Use the services indicated by the given +K@... remote
1011         # service hints, if any are present and can be resolved to a
1012         # URI.
1013         for hint in locator.hints:
1014             if hint.startswith('K@'):
1015                 if len(hint) == 7:
1016                     sorted_roots.append(
1017                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1018                 elif len(hint) == 29:
1019                     svc = self._gateway_services.get(hint[2:])
1020                     if svc:
1021                         sorted_roots.append(svc['_service_root'])
1022
1023         # Sort the available local services by weight (heaviest first)
1024         # for this locator, and return their service_roots (base URIs)
1025         # in that order.
1026         use_services = self._keep_services
1027         if need_writable:
1028             use_services = self._writable_services
1029         self.using_proxy = self._any_nondisk_services(use_services)
1030         sorted_roots.extend([
1031             svc['_service_root'] for svc in sorted(
1032                 use_services,
1033                 reverse=True,
1034                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1035         _logger.debug("{}: {}".format(locator, sorted_roots))
1036         return sorted_roots
1037
1038     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1039         # roots_map is a dictionary, mapping Keep service root strings
1040         # to _KeepService objects.  Poll for Keep services, and add any
1041         # new ones to roots_map.  Return the current list of local
1042         # root strings.
1043         headers.setdefault('Authorization', "Bearer %s" % (self.api_token,))
1044         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1045         for root in local_roots:
1046             if root not in roots_map:
1047                 roots_map[root] = self._KeepService(
1048                     root, self._user_agent_pool,
1049                     upload_counter=self.upload_counter,
1050                     download_counter=self.download_counter,
1051                     headers=headers,
1052                     insecure=self.insecure)
1053         return local_roots
1054
1055     @staticmethod
1056     def _check_loop_result(result):
1057         # KeepClient RetryLoops should save results as a 2-tuple: the
1058         # actual result of the request, and the number of servers available
1059         # to receive the request this round.
1060         # This method returns True if there's a real result, False if
1061         # there are no more servers available, otherwise None.
1062         if isinstance(result, Exception):
1063             return None
1064         result, tried_server_count = result
1065         if (result is not None) and (result is not False):
1066             return True
1067         elif tried_server_count < 1:
1068             _logger.info("No more Keep services to try; giving up")
1069             return False
1070         else:
1071             return None
1072
1073     def get_from_cache(self, loc_s):
1074         """Fetch a block only if is in the cache, otherwise return None."""
1075         locator = KeepLocator(loc_s)
1076         slot = self.block_cache.get(locator.md5sum)
1077         if slot is not None and slot.ready.is_set():
1078             return slot.get()
1079         else:
1080             return None
1081
1082     def refresh_signature(self, loc):
1083         """Ask Keep to get the remote block and return its local signature"""
1084         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1085         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1086
1087     @retry.retry_method
1088     def head(self, loc_s, **kwargs):
1089         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1090
1091     @retry.retry_method
1092     def get(self, loc_s, **kwargs):
1093         return self._get_or_head(loc_s, method="GET", **kwargs)
1094
1095     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1096         """Get data from Keep.
1097
1098         This method fetches one or more blocks of data from Keep.  It
1099         sends a request each Keep service registered with the API
1100         server (or the proxy provided when this client was
1101         instantiated), then each service named in location hints, in
1102         sequence.  As soon as one service provides the data, it's
1103         returned.
1104
1105         Arguments:
1106         * loc_s: A string of one or more comma-separated locators to fetch.
1107           This method returns the concatenation of these blocks.
1108         * num_retries: The number of times to retry GET requests to
1109           *each* Keep server if it returns temporary failures, with
1110           exponential backoff.  Note that, in each loop, the method may try
1111           to fetch data from every available Keep service, along with any
1112           that are named in location hints in the locator.  The default value
1113           is set when the KeepClient is initialized.
1114         """
1115         if ',' in loc_s:
1116             return ''.join(self.get(x) for x in loc_s.split(','))
1117
1118         self.get_counter.add(1)
1119
1120         request_id = (request_id or
1121                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1122                       arvados.util.new_request_id())
1123         if headers is None:
1124             headers = {}
1125         headers['X-Request-Id'] = request_id
1126
1127         slot = None
1128         blob = None
1129         try:
1130             locator = KeepLocator(loc_s)
1131             if method == "GET":
1132                 while slot is None:
1133                     slot, first = self.block_cache.reserve_cache(locator.md5sum)
1134                     if first:
1135                         # Fresh and empty "first time it is used" slot
1136                         break
1137                     if prefetch:
1138                         # this is request for a prefetch to fill in
1139                         # the cache, don't need to wait for the
1140                         # result, so if it is already in flight return
1141                         # immediately.  Clear 'slot' to prevent
1142                         # finally block from calling slot.set()
1143                         if slot.ready.is_set():
1144                             slot.get()
1145                         slot = None
1146                         return None
1147
1148                     blob = slot.get()
1149                     if blob is not None:
1150                         self.hits_counter.add(1)
1151                         return blob
1152
1153                     # If blob is None, this means either
1154                     #
1155                     # (a) another thread was fetching this block and
1156                     # failed with an error or
1157                     #
1158                     # (b) cache thrashing caused the slot to be
1159                     # evicted (content set to None) by another thread
1160                     # between the call to reserve_cache() and get().
1161                     #
1162                     # We'll handle these cases by reserving a new slot
1163                     # and then doing a full GET request.
1164                     slot = None
1165
1166             self.misses_counter.add(1)
1167
1168             # If the locator has hints specifying a prefix (indicating a
1169             # remote keepproxy) or the UUID of a local gateway service,
1170             # read data from the indicated service(s) instead of the usual
1171             # list of local disk services.
1172             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1173                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1174             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1175                                for hint in locator.hints if (
1176                                        hint.startswith('K@') and
1177                                        len(hint) == 29 and
1178                                        self._gateway_services.get(hint[2:])
1179                                        )])
1180             # Map root URLs to their _KeepService objects.
1181             roots_map = {
1182                 root: self._KeepService(root, self._user_agent_pool,
1183                                        upload_counter=self.upload_counter,
1184                                        download_counter=self.download_counter,
1185                                        headers=headers,
1186                                        insecure=self.insecure)
1187                 for root in hint_roots
1188             }
1189
1190             # See #3147 for a discussion of the loop implementation.  Highlights:
1191             # * Refresh the list of Keep services after each failure, in case
1192             #   it's being updated.
1193             # * Retry until we succeed, we're out of retries, or every available
1194             #   service has returned permanent failure.
1195             sorted_roots = []
1196             roots_map = {}
1197             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1198                                    backoff_start=2)
1199             for tries_left in loop:
1200                 try:
1201                     sorted_roots = self.map_new_services(
1202                         roots_map, locator,
1203                         force_rebuild=(tries_left < num_retries),
1204                         need_writable=False,
1205                         headers=headers)
1206                 except Exception as error:
1207                     loop.save_result(error)
1208                     continue
1209
1210                 # Query _KeepService objects that haven't returned
1211                 # permanent failure, in our specified shuffle order.
1212                 services_to_try = [roots_map[root]
1213                                    for root in sorted_roots
1214                                    if roots_map[root].usable()]
1215                 for keep_service in services_to_try:
1216                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1217                     if blob is not None:
1218                         break
1219                 loop.save_result((blob, len(services_to_try)))
1220
1221             # Always cache the result, then return it if we succeeded.
1222             if loop.success():
1223                 return blob
1224         finally:
1225             if slot is not None:
1226                 self.block_cache.set(slot, blob)
1227
1228         # Q: Including 403 is necessary for the Keep tests to continue
1229         # passing, but maybe they should expect KeepReadError instead?
1230         not_founds = sum(1 for key in sorted_roots
1231                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1232         service_errors = ((key, roots_map[key].last_result()['error'])
1233                           for key in sorted_roots)
1234         if not roots_map:
1235             raise arvados.errors.KeepReadError(
1236                 "[{}] failed to read {}: no Keep services available ({})".format(
1237                     request_id, loc_s, loop.last_result()))
1238         elif not_founds == len(sorted_roots):
1239             raise arvados.errors.NotFoundError(
1240                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1241         else:
1242             raise arvados.errors.KeepReadError(
1243                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1244
1245     @retry.retry_method
1246     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1247         """Save data in Keep.
1248
1249         This method will get a list of Keep services from the API server, and
1250         send the data to each one simultaneously in a new thread.  Once the
1251         uploads are finished, if enough copies are saved, this method returns
1252         the most recent HTTP response body.  If requests fail to upload
1253         enough copies, this method raises KeepWriteError.
1254
1255         Arguments:
1256         * data: The string of data to upload.
1257         * copies: The number of copies that the user requires be saved.
1258           Default 2.
1259         * num_retries: The number of times to retry PUT requests to
1260           *each* Keep server if it returns temporary failures, with
1261           exponential backoff.  The default value is set when the
1262           KeepClient is initialized.
1263         * classes: An optional list of storage class names where copies should
1264           be written.
1265         """
1266
1267         classes = classes or self._default_classes
1268
1269         if not isinstance(data, bytes):
1270             data = data.encode()
1271
1272         self.put_counter.add(1)
1273
1274         data_hash = hashlib.md5(data).hexdigest()
1275         loc_s = data_hash + '+' + str(len(data))
1276         if copies < 1:
1277             return loc_s
1278         locator = KeepLocator(loc_s)
1279
1280         request_id = (request_id or
1281                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1282                       arvados.util.new_request_id())
1283         headers = {
1284             'X-Request-Id': request_id,
1285             'X-Keep-Desired-Replicas': str(copies),
1286         }
1287         roots_map = {}
1288         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1289                                backoff_start=2)
1290         done_copies = 0
1291         done_classes = []
1292         for tries_left in loop:
1293             try:
1294                 sorted_roots = self.map_new_services(
1295                     roots_map, locator,
1296                     force_rebuild=(tries_left < num_retries),
1297                     need_writable=True,
1298                     headers=headers)
1299             except Exception as error:
1300                 loop.save_result(error)
1301                 continue
1302
1303             pending_classes = []
1304             if done_classes is not None:
1305                 pending_classes = list(set(classes) - set(done_classes))
1306             writer_pool = KeepClient._KeepWriterThreadPool(
1307                 data=data,
1308                 data_hash=data_hash,
1309                 copies=copies - done_copies,
1310                 max_service_replicas=self.max_replicas_per_service,
1311                 timeout=self.current_timeout(num_retries - tries_left),
1312                 classes=pending_classes,
1313             )
1314             for service_root, ks in [(root, roots_map[root])
1315                                      for root in sorted_roots]:
1316                 if ks.finished():
1317                     continue
1318                 writer_pool.add_task(ks, service_root)
1319             writer_pool.join()
1320             pool_copies, pool_classes = writer_pool.done()
1321             done_copies += pool_copies
1322             if (done_classes is not None) and (pool_classes is not None):
1323                 done_classes += pool_classes
1324                 loop.save_result(
1325                     (done_copies >= copies and set(done_classes) == set(classes),
1326                     writer_pool.total_task_nr))
1327             else:
1328                 # Old keepstore contacted without storage classes support:
1329                 # success is determined only by successful copies.
1330                 #
1331                 # Disable storage classes tracking from this point forward.
1332                 if not self._storage_classes_unsupported_warning:
1333                     self._storage_classes_unsupported_warning = True
1334                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1335                 done_classes = None
1336                 loop.save_result(
1337                     (done_copies >= copies, writer_pool.total_task_nr))
1338
1339         if loop.success():
1340             return writer_pool.response()
1341         if not roots_map:
1342             raise arvados.errors.KeepWriteError(
1343                 "[{}] failed to write {}: no Keep services available ({})".format(
1344                     request_id, data_hash, loop.last_result()))
1345         else:
1346             service_errors = ((key, roots_map[key].last_result()['error'])
1347                               for key in sorted_roots
1348                               if roots_map[key].last_result()['error'])
1349             raise arvados.errors.KeepWriteError(
1350                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1351                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1352
1353     def _block_prefetch_worker(self):
1354         """The background downloader thread."""
1355         while True:
1356             try:
1357                 b = self._prefetch_queue.get()
1358                 if b is None:
1359                     return
1360                 self.get(b, prefetch=True)
1361             except Exception:
1362                 _logger.exception("Exception doing block prefetch")
1363
1364     def _start_prefetch_threads(self):
1365         if self._prefetch_threads is None:
1366             with self.lock:
1367                 if self._prefetch_threads is not None:
1368                     return
1369                 self._prefetch_queue = queue.Queue()
1370                 self._prefetch_threads = []
1371                 for i in range(0, self.num_prefetch_threads):
1372                     thread = threading.Thread(target=self._block_prefetch_worker)
1373                     self._prefetch_threads.append(thread)
1374                     thread.daemon = True
1375                     thread.start()
1376
1377     def block_prefetch(self, locator):
1378         """
1379         This relies on the fact that KeepClient implements a block cache,
1380         so repeated requests for the same block will not result in repeated
1381         downloads (unless the block is evicted from the cache.)  This method
1382         does not block.
1383         """
1384
1385         if self.block_cache.get(locator) is not None:
1386             return
1387
1388         self._start_prefetch_threads()
1389         self._prefetch_queue.put(locator)
1390
1391     def stop_prefetch_threads(self):
1392         with self.lock:
1393             if self._prefetch_threads is not None:
1394                 for t in self._prefetch_threads:
1395                     self._prefetch_queue.put(None)
1396                 for t in self._prefetch_threads:
1397                     t.join()
1398             self._prefetch_threads = None
1399             self._prefetch_queue = None
1400
1401     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1402         """A stub for put().
1403
1404         This method is used in place of the real put() method when
1405         using local storage (see constructor's local_store argument).
1406
1407         copies and num_retries arguments are ignored: they are here
1408         only for the sake of offering the same call signature as
1409         put().
1410
1411         Data stored this way can be retrieved via local_store_get().
1412         """
1413         md5 = hashlib.md5(data).hexdigest()
1414         locator = '%s+%d' % (md5, len(data))
1415         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1416             f.write(data)
1417         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1418                   os.path.join(self.local_store, md5))
1419         return locator
1420
1421     def local_store_get(self, loc_s, num_retries=None):
1422         """Companion to local_store_put()."""
1423         try:
1424             locator = KeepLocator(loc_s)
1425         except ValueError:
1426             raise arvados.errors.NotFoundError(
1427                 "Invalid data locator: '%s'" % loc_s)
1428         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1429             return b''
1430         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1431             return f.read()
1432
1433     def local_store_head(self, loc_s, num_retries=None):
1434         """Companion to local_store_put()."""
1435         try:
1436             locator = KeepLocator(loc_s)
1437         except ValueError:
1438             raise arvados.errors.NotFoundError(
1439                 "Invalid data locator: '%s'" % loc_s)
1440         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1441             return True
1442         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1443             return True