]> git.arvados.org - arvados.git/blob - sdk/python/arvados/keep.py
22320: Write more stats for charting.
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import copy
6 import collections
7 import datetime
8 import hashlib
9 import errno
10 import io
11 import logging
12 import math
13 import os
14 import pycurl
15 import queue
16 import re
17 import socket
18 import ssl
19 import sys
20 import threading
21 import resource
22 import urllib.parse
23 import traceback
24 import weakref
25
26 from io import BytesIO
27
28 import arvados
29 import arvados.config as config
30 import arvados.errors
31 import arvados.retry as retry
32 import arvados.util
33
34 from ._internal import basedirs, diskcache, Timer, parse_seq
35 from ._internal.pycurl import PyCurlHelper
36
37 _logger = logging.getLogger('arvados.keep')
38 global_client_object = None
39
40 # Monkey patch TCP constants when not available (apple). Values sourced from:
41 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
42 if sys.platform == 'darwin':
43     if not hasattr(socket, 'TCP_KEEPALIVE'):
44         socket.TCP_KEEPALIVE = 0x010
45     if not hasattr(socket, 'TCP_KEEPINTVL'):
46         socket.TCP_KEEPINTVL = 0x101
47     if not hasattr(socket, 'TCP_KEEPCNT'):
48         socket.TCP_KEEPCNT = 0x102
49
50 class KeepLocator(object):
51     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
52     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
53
54     def __init__(self, locator_str):
55         self.hints = []
56         self._perm_sig = None
57         self._perm_expiry = None
58         pieces = iter(locator_str.split('+'))
59         self.md5sum = next(pieces)
60         try:
61             self.size = int(next(pieces))
62         except StopIteration:
63             self.size = None
64         for hint in pieces:
65             if self.HINT_RE.match(hint) is None:
66                 raise ValueError("invalid hint format: {}".format(hint))
67             elif hint.startswith('A'):
68                 self.parse_permission_hint(hint)
69             else:
70                 self.hints.append(hint)
71
72     def __str__(self):
73         return '+'.join(
74             str(s)
75             for s in [self.md5sum, self.size,
76                       self.permission_hint()] + self.hints
77             if s is not None)
78
79     def stripped(self):
80         if self.size is not None:
81             return "%s+%i" % (self.md5sum, self.size)
82         else:
83             return self.md5sum
84
85     def _make_hex_prop(name, length):
86         # Build and return a new property with the given name that
87         # must be a hex string of the given length.
88         data_name = '_{}'.format(name)
89         def getter(self):
90             return getattr(self, data_name)
91         def setter(self, hex_str):
92             if not arvados.util.is_hex(hex_str, length):
93                 raise ValueError("{} is not a {}-digit hex string: {!r}".
94                                  format(name, length, hex_str))
95             setattr(self, data_name, hex_str)
96         return property(getter, setter)
97
98     md5sum = _make_hex_prop('md5sum', 32)
99     perm_sig = _make_hex_prop('perm_sig', 40)
100
101     @property
102     def perm_expiry(self):
103         return self._perm_expiry
104
105     @perm_expiry.setter
106     def perm_expiry(self, value):
107         if not arvados.util.is_hex(value, 1, 8):
108             raise ValueError(
109                 "permission timestamp must be a hex Unix timestamp: {}".
110                 format(value))
111         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
112
113     def permission_hint(self):
114         data = [self.perm_sig, self.perm_expiry]
115         if None in data:
116             return None
117         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
118         return "A{}@{:08x}".format(*data)
119
120     def parse_permission_hint(self, s):
121         try:
122             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
123         except IndexError:
124             raise ValueError("bad permission hint {}".format(s))
125
126     def permission_expired(self, as_of_dt=None):
127         if self.perm_expiry is None:
128             return False
129         elif as_of_dt is None:
130             as_of_dt = datetime.datetime.now()
131         return self.perm_expiry <= as_of_dt
132
133
134 class KeepBlockCache(object):
135     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
136         self.cache_max = cache_max
137         self._cache = collections.OrderedDict()
138         self._cache_lock = threading.Lock()
139         self._max_slots = max_slots
140         self._disk_cache = disk_cache
141         self._disk_cache_dir = disk_cache_dir
142         self._cache_updating = threading.Condition(self._cache_lock)
143
144         if self._disk_cache and self._disk_cache_dir is None:
145             self._disk_cache_dir = str(basedirs.BaseDirectories('CACHE').storage_path('keep'))
146
147         if self._max_slots == 0:
148             if self._disk_cache:
149                 # Each block uses two file descriptors, one used to
150                 # open it initially and hold the flock(), and a second
151                 # hidden one used by mmap().
152                 #
153                 # Set max slots to 1/8 of maximum file handles.  This
154                 # means we'll use at most 1/4 of total file handles.
155                 #
156                 # NOFILE typically defaults to 1024 on Linux so this
157                 # is 128 slots (256 file handles), which means we can
158                 # cache up to 8 GiB of 64 MiB blocks.  This leaves
159                 # 768 file handles for sockets and other stuff.
160                 #
161                 # When we want the ability to have more cache (e.g. in
162                 # arv-mount) we'll increase rlimit before calling
163                 # this.
164                 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
165             else:
166                 # RAM cache slots
167                 self._max_slots = 512
168
169         if self.cache_max == 0:
170             if self._disk_cache:
171                 fs = os.statvfs(self._disk_cache_dir)
172                 # Calculation of available space incorporates existing cache usage
173                 existing_usage = diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
174                 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
175                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
176                 # pick smallest of:
177                 # 10% of total disk size
178                 # 25% of available space
179                 # max_slots * 64 MiB
180                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
181             else:
182                 # 256 MiB in RAM
183                 self.cache_max = (256 * 1024 * 1024)
184
185         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
186
187         self.cache_total = 0
188         if self._disk_cache:
189             self._cache = diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
190             for slot in self._cache.values():
191                 self.cache_total += slot.size()
192             self.cap_cache()
193
194     class _CacheSlot:
195         __slots__ = ("locator", "ready", "content")
196
197         def __init__(self, locator):
198             self.locator = locator
199             self.ready = threading.Event()
200             self.content = None
201
202         def get(self):
203             self.ready.wait()
204             return self.content
205
206         def set(self, value):
207             if self.content is not None:
208                 return False
209             self.content = value
210             self.ready.set()
211             return True
212
213         def size(self):
214             if self.content is None:
215                 return 0
216             else:
217                 return len(self.content)
218
219         def evict(self):
220             self.content = None
221
222
223     def _resize_cache(self, cache_max, max_slots):
224         # Try and make sure the contents of the cache do not exceed
225         # the supplied maximums.
226
227         if self.cache_total <= cache_max and len(self._cache) <= max_slots:
228             return
229
230         _evict_candidates = collections.deque(self._cache.values())
231         while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
232             slot = _evict_candidates.popleft()
233             if not slot.ready.is_set():
234                 continue
235
236             sz = slot.size()
237             slot.evict()
238             self.cache_total -= sz
239             del self._cache[slot.locator]
240
241
242     def cap_cache(self):
243         '''Cap the cache size to self.cache_max'''
244         with self._cache_updating:
245             self._resize_cache(self.cache_max, self._max_slots)
246             self._cache_updating.notify_all()
247
248     def _get(self, locator):
249         # Test if the locator is already in the cache
250         if locator in self._cache:
251             n = self._cache[locator]
252             if n.ready.is_set() and n.content is None:
253                 del self._cache[n.locator]
254                 return None
255             self._cache.move_to_end(locator)
256             return n
257         if self._disk_cache:
258             # see if it exists on disk
259             n = diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
260             if n is not None:
261                 self._cache[n.locator] = n
262                 self.cache_total += n.size()
263                 return n
264         return None
265
266     def get(self, locator):
267         with self._cache_lock:
268             return self._get(locator)
269
270     def reserve_cache(self, locator):
271         '''Reserve a cache slot for the specified locator,
272         or return the existing slot.'''
273         with self._cache_updating:
274             n = self._get(locator)
275             if n:
276                 return n, False
277             else:
278                 # Add a new cache slot for the locator
279                 self._resize_cache(self.cache_max, self._max_slots-1)
280                 while len(self._cache) >= self._max_slots:
281                     # If there isn't a slot available, need to wait
282                     # for something to happen that releases one of the
283                     # cache slots.  Idle for 200 ms or woken up by
284                     # another thread
285                     self._cache_updating.wait(timeout=0.2)
286                     self._resize_cache(self.cache_max, self._max_slots-1)
287
288                 if self._disk_cache:
289                     n = diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
290                 else:
291                     n = KeepBlockCache._CacheSlot(locator)
292                 self._cache[n.locator] = n
293                 return n, True
294
295     def set(self, slot, blob):
296         try:
297             if slot.set(blob):
298                 self.cache_total += slot.size()
299             return
300         except OSError as e:
301             if e.errno == errno.ENOMEM:
302                 # Reduce max slots to current - 4, cap cache and retry
303                 with self._cache_lock:
304                     self._max_slots = max(4, len(self._cache) - 4)
305             elif e.errno == errno.ENOSPC:
306                 # Reduce disk max space to current - 256 MiB, cap cache and retry
307                 with self._cache_lock:
308                     sm = sum(st.size() for st in self._cache.values())
309                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
310             elif e.errno == errno.ENODEV:
311                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
312         except Exception as e:
313             pass
314         finally:
315             # Check if we should evict things from the cache.  Either
316             # because we added a new thing or there was an error and
317             # we possibly adjusted the limits down, so we might need
318             # to push something out.
319             self.cap_cache()
320
321         try:
322             # Only gets here if there was an error the first time. The
323             # exception handler adjusts limits downward in some cases
324             # to free up resources, which would make the operation
325             # succeed.
326             if slot.set(blob):
327                 self.cache_total += slot.size()
328         except Exception as e:
329             # It failed again.  Give up.
330             slot.set(None)
331             raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
332
333         self.cap_cache()
334
335     def clear(self):
336         with self._cache_lock:
337             self._cache.clear()
338             self.cache_total = 0
339
340 class _Counter:
341     def __init__(self, v=0):
342         self._lk = threading.Lock()
343         self._val = v
344
345     def add(self, v):
346         with self._lk:
347             self._val += v
348
349     def get(self):
350         with self._lk:
351             return self._val
352
353
354 class KeepClient(object):
355     DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
356     DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
357
358     class _KeepService(PyCurlHelper):
359         """Make requests to a single Keep service, and track results.
360
361         A _KeepService is intended to last long enough to perform one
362         transaction (GET or PUT) against one Keep service. This can
363         involve calling either get() or put() multiple times in order
364         to retry after transient failures. However, calling both get()
365         and put() on a single instance -- or using the same instance
366         to access two different Keep services -- will not produce
367         sensible behavior.
368         """
369
370         HTTP_ERRORS = (
371             socket.error,
372             ssl.SSLError,
373             arvados.errors.HttpError,
374         )
375
376         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
377                      upload_counter=None,
378                      download_counter=None,
379                      headers={},
380                      insecure=False):
381             super().__init__()
382             self.root = root
383             self._user_agent_pool = user_agent_pool
384             self._result = {'error': None}
385             self._usable = True
386             self._session = None
387             self._socket = None
388             self.get_headers = {'Accept': 'application/octet-stream'}
389             self.get_headers.update(headers)
390             self.put_headers = headers
391             self.upload_counter = upload_counter
392             self.download_counter = download_counter
393             self.insecure = insecure
394
395         def usable(self):
396             """Is it worth attempting a request?"""
397             return self._usable
398
399         def finished(self):
400             """Did the request succeed or encounter permanent failure?"""
401             return self._result['error'] == False or not self._usable
402
403         def last_result(self):
404             return self._result
405
406         def _get_user_agent(self):
407             try:
408                 return self._user_agent_pool.get(block=False)
409             except queue.Empty:
410                 return pycurl.Curl()
411
412         def _put_user_agent(self, ua):
413             try:
414                 ua.reset()
415                 self._user_agent_pool.put(ua, block=False)
416             except:
417                 ua.close()
418
419         def get(self, locator, method="GET", timeout=None):
420             # locator is a KeepLocator object.
421             url = self.root + str(locator)
422             _logger.debug("Request: %s %s", method, url)
423             curl = self._get_user_agent()
424             ok = None
425             try:
426                 with Timer() as t:
427                     self._headers = {}
428                     response_body = BytesIO()
429                     curl.setopt(pycurl.NOSIGNAL, 1)
430                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
431                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
432                     curl.setopt(pycurl.URL, url.encode('utf-8'))
433                     curl.setopt(pycurl.HTTPHEADER, [
434                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
435                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
436                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
437                     if self.insecure:
438                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
439                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
440                     else:
441                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
442                     if method == "HEAD":
443                         curl.setopt(pycurl.NOBODY, True)
444                     else:
445                         curl.setopt(pycurl.HTTPGET, True)
446                     self._setcurltimeouts(curl, timeout, method=="HEAD")
447
448                     try:
449                         curl.perform()
450                     except Exception as e:
451                         raise arvados.errors.HttpError(0, str(e))
452                     finally:
453                         if self._socket:
454                             self._socket.close()
455                             self._socket = None
456                     self._result = {
457                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
458                         'body': response_body.getvalue(),
459                         'headers': self._headers,
460                         'error': False,
461                     }
462
463                 ok = retry.check_http_response_success(self._result['status_code'])
464                 if not ok:
465                     self._result['error'] = arvados.errors.HttpError(
466                         self._result['status_code'],
467                         self._headers.get('x-status-line', 'Error'))
468             except self.HTTP_ERRORS as e:
469                 self._result = {
470                     'error': e,
471                 }
472             self._usable = ok != False
473             if self._result.get('status_code', None):
474                 # The client worked well enough to get an HTTP status
475                 # code, so presumably any problems are just on the
476                 # server side and it's OK to reuse the client.
477                 self._put_user_agent(curl)
478             else:
479                 # Don't return this client to the pool, in case it's
480                 # broken.
481                 curl.close()
482             if not ok:
483                 _logger.debug("Request fail: GET %s => %s: %s",
484                               url, type(self._result['error']), str(self._result['error']))
485                 return None
486             if method == "HEAD":
487                 _logger.info("HEAD %s: %s bytes",
488                          self._result['status_code'],
489                          self._result.get('content-length'))
490                 if self._result['headers'].get('x-keep-locator'):
491                     # This is a response to a remote block copy request, return
492                     # the local copy block locator.
493                     return self._result['headers'].get('x-keep-locator')
494                 return True
495
496             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
497                          self._result['status_code'],
498                          len(self._result['body']),
499                          t.msecs,
500                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
501
502             if self.download_counter:
503                 self.download_counter.add(len(self._result['body']))
504             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
505             if resp_md5 != locator.md5sum:
506                 _logger.warning("Checksum fail: md5(%s) = %s",
507                                 url, resp_md5)
508                 self._result['error'] = arvados.errors.HttpError(
509                     0, 'Checksum fail')
510                 return None
511             return self._result['body']
512
513         def put(self, hash_s, body, timeout=None, headers={}):
514             put_headers = copy.copy(self.put_headers)
515             put_headers.update(headers)
516             url = self.root + hash_s
517             _logger.debug("Request: PUT %s", url)
518             curl = self._get_user_agent()
519             ok = None
520             try:
521                 with Timer() as t:
522                     self._headers = {}
523                     body_reader = BytesIO(body)
524                     response_body = BytesIO()
525                     curl.setopt(pycurl.NOSIGNAL, 1)
526                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
527                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
528                     curl.setopt(pycurl.URL, url.encode('utf-8'))
529                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
530                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
531                     # response) instead of sending the request body immediately.
532                     # This allows the server to reject the request if the request
533                     # is invalid or the server is read-only, without waiting for
534                     # the client to send the entire block.
535                     curl.setopt(pycurl.UPLOAD, True)
536                     curl.setopt(pycurl.INFILESIZE, len(body))
537                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
538                     curl.setopt(pycurl.HTTPHEADER, [
539                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
540                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
541                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
542                     if self.insecure:
543                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
544                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
545                     else:
546                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
547                     self._setcurltimeouts(curl, timeout)
548                     try:
549                         curl.perform()
550                     except Exception as e:
551                         raise arvados.errors.HttpError(0, str(e))
552                     finally:
553                         if self._socket:
554                             self._socket.close()
555                             self._socket = None
556                     self._result = {
557                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
558                         'body': response_body.getvalue().decode('utf-8'),
559                         'headers': self._headers,
560                         'error': False,
561                     }
562                 ok = retry.check_http_response_success(self._result['status_code'])
563                 if not ok:
564                     self._result['error'] = arvados.errors.HttpError(
565                         self._result['status_code'],
566                         self._headers.get('x-status-line', 'Error'))
567             except self.HTTP_ERRORS as e:
568                 self._result = {
569                     'error': e,
570                 }
571             self._usable = ok != False # still usable if ok is True or None
572             if self._result.get('status_code', None):
573                 # Client is functional. See comment in get().
574                 self._put_user_agent(curl)
575             else:
576                 curl.close()
577             if not ok:
578                 _logger.debug("Request fail: PUT %s => %s: %s",
579                               url, type(self._result['error']), str(self._result['error']))
580                 return False
581             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
582                          self._result['status_code'],
583                          len(body),
584                          t.msecs,
585                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
586             if self.upload_counter:
587                 self.upload_counter.add(len(body))
588             return True
589
590
591     class _KeepWriterQueue(queue.Queue):
592         def __init__(self, copies, classes=[]):
593             queue.Queue.__init__(self) # Old-style superclass
594             self.wanted_copies = copies
595             self.wanted_storage_classes = classes
596             self.successful_copies = 0
597             self.confirmed_storage_classes = {}
598             self.response = None
599             self.storage_classes_tracking = True
600             self.queue_data_lock = threading.RLock()
601             self.pending_tries = max(copies, len(classes))
602             self.pending_tries_notification = threading.Condition()
603
604         def write_success(self, response, replicas_nr, classes_confirmed):
605             with self.queue_data_lock:
606                 self.successful_copies += replicas_nr
607                 if classes_confirmed is None:
608                     self.storage_classes_tracking = False
609                 elif self.storage_classes_tracking:
610                     for st_class, st_copies in classes_confirmed.items():
611                         try:
612                             self.confirmed_storage_classes[st_class] += st_copies
613                         except KeyError:
614                             self.confirmed_storage_classes[st_class] = st_copies
615                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
616                 self.response = response
617             with self.pending_tries_notification:
618                 self.pending_tries_notification.notify_all()
619
620         def write_fail(self, ks):
621             with self.pending_tries_notification:
622                 self.pending_tries += 1
623                 self.pending_tries_notification.notify()
624
625         def pending_copies(self):
626             with self.queue_data_lock:
627                 return self.wanted_copies - self.successful_copies
628
629         def satisfied_classes(self):
630             with self.queue_data_lock:
631                 if not self.storage_classes_tracking:
632                     # Notifies disabled storage classes expectation to
633                     # the outer loop.
634                     return None
635             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
636
637         def pending_classes(self):
638             with self.queue_data_lock:
639                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
640                     return []
641                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
642                 for st_class, st_copies in self.confirmed_storage_classes.items():
643                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
644                         unsatisfied_classes.remove(st_class)
645                 return unsatisfied_classes
646
647         def get_next_task(self):
648             with self.pending_tries_notification:
649                 while True:
650                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
651                         # This notify_all() is unnecessary --
652                         # write_success() already called notify_all()
653                         # when pending<1 became true, so it's not
654                         # possible for any other thread to be in
655                         # wait() now -- but it's cheap insurance
656                         # against deadlock so we do it anyway:
657                         self.pending_tries_notification.notify_all()
658                         # Drain the queue and then raise Queue.Empty
659                         while True:
660                             self.get_nowait()
661                             self.task_done()
662                     elif self.pending_tries > 0:
663                         service, service_root = self.get_nowait()
664                         if service.finished():
665                             self.task_done()
666                             continue
667                         self.pending_tries -= 1
668                         return service, service_root
669                     elif self.empty():
670                         self.pending_tries_notification.notify_all()
671                         raise queue.Empty
672                     else:
673                         self.pending_tries_notification.wait()
674
675
676     class _KeepWriterThreadPool:
677         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
678             self.total_task_nr = 0
679             if (not max_service_replicas) or (max_service_replicas >= copies):
680                 num_threads = 1
681             else:
682                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
683             _logger.debug("Pool max threads is %d", num_threads)
684             self.workers = []
685             self.queue = KeepClient._KeepWriterQueue(copies, classes)
686             # Create workers
687             for _ in range(num_threads):
688                 w = KeepClient._KeepWriterThread(self.queue, data, data_hash, timeout)
689                 self.workers.append(w)
690
691         def add_task(self, ks, service_root):
692             self.queue.put((ks, service_root))
693             self.total_task_nr += 1
694
695         def done(self):
696             return self.queue.successful_copies, self.queue.satisfied_classes()
697
698         def join(self):
699             # Start workers
700             for worker in self.workers:
701                 worker.start()
702             # Wait for finished work
703             self.queue.join()
704
705         def response(self):
706             return self.queue.response
707
708
709     class _KeepWriterThread(threading.Thread):
710         class TaskFailed(RuntimeError):
711             """Exception for failed Keep writes
712
713             TODO: Move this class to the module top level and document it
714
715             @private
716             """
717
718
719         def __init__(self, queue, data, data_hash, timeout=None):
720             super().__init__()
721             self.timeout = timeout
722             self.queue = queue
723             self.data = data
724             self.data_hash = data_hash
725             self.daemon = True
726
727         def run(self):
728             while True:
729                 try:
730                     service, service_root = self.queue.get_next_task()
731                 except queue.Empty:
732                     return
733                 try:
734                     locator, copies, classes = self.do_task(service, service_root)
735                 except Exception as e:
736                     if not isinstance(e, self.TaskFailed):
737                         _logger.exception("Exception in _KeepWriterThread")
738                     self.queue.write_fail(service)
739                 else:
740                     self.queue.write_success(locator, copies, classes)
741                 finally:
742                     self.queue.task_done()
743
744         def do_task(self, service, service_root):
745             classes = self.queue.pending_classes()
746             headers = {}
747             if len(classes) > 0:
748                 classes.sort()
749                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
750             success = bool(service.put(self.data_hash,
751                                         self.data,
752                                         timeout=self.timeout,
753                                         headers=headers))
754             result = service.last_result()
755
756             if not success:
757                 if result.get('status_code'):
758                     _logger.debug("Request fail: PUT %s => %s %s",
759                                   self.data_hash,
760                                   result.get('status_code'),
761                                   result.get('body'))
762                 raise self.TaskFailed()
763
764             _logger.debug("_KeepWriterThread %s succeeded %s+%i %s",
765                           str(threading.current_thread()),
766                           self.data_hash,
767                           len(self.data),
768                           service_root)
769             try:
770                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
771             except (KeyError, ValueError):
772                 replicas_stored = 1
773
774             classes_confirmed = collections.defaultdict(int)
775             try:
776                 scch = result['headers']['x-keep-storage-classes-confirmed']
777                 for confirmation in parse_seq(scch):
778                     stored_class, _, stored_copies = confirmation.partition('=')
779                     if stored_copies:
780                         classes_confirmed[stored_class] += int(stored_copies)
781             except (KeyError, ValueError):
782                 # Storage classes confirmed header missing or corrupt
783                 classes_confirmed = None
784
785             return result['body'].strip(), replicas_stored, classes_confirmed
786
787
788     def __init__(self, api_client=None, proxy=None,
789                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
790                  api_token=None, local_store=None, block_cache=None,
791                  num_retries=10, session=None, num_prefetch_threads=None):
792         """Initialize a new KeepClient.
793
794         Arguments:
795         :api_client:
796           The API client to use to find Keep services.  If not
797           provided, KeepClient will build one from available Arvados
798           configuration.
799
800         :proxy:
801           If specified, this KeepClient will send requests to this Keep
802           proxy.  Otherwise, KeepClient will fall back to the setting of the
803           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
804           If you want to KeepClient does not use a proxy, pass in an empty
805           string.
806
807         :timeout:
808           The initial timeout (in seconds) for HTTP requests to Keep
809           non-proxy servers.  A tuple of three floats is interpreted as
810           (connection_timeout, read_timeout, minimum_bandwidth). A connection
811           will be aborted if the average traffic rate falls below
812           minimum_bandwidth bytes per second over an interval of read_timeout
813           seconds. Because timeouts are often a result of transient server
814           load, the actual connection timeout will be increased by a factor
815           of two on each retry.
816           Default: (2, 256, 32768).
817
818         :proxy_timeout:
819           The initial timeout (in seconds) for HTTP requests to
820           Keep proxies. A tuple of three floats is interpreted as
821           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
822           described above for adjusting connection timeouts on retry also
823           applies.
824           Default: (20, 256, 32768).
825
826         :api_token:
827           If you're not using an API client, but only talking
828           directly to a Keep proxy, this parameter specifies an API token
829           to authenticate Keep requests.  It is an error to specify both
830           api_client and api_token.  If you specify neither, KeepClient
831           will use one available from the Arvados configuration.
832
833         :local_store:
834           If specified, this KeepClient will bypass Keep
835           services, and save data to the named directory.  If unspecified,
836           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
837           environment variable.  If you want to ensure KeepClient does not
838           use local storage, pass in an empty string.  This is primarily
839           intended to mock a server for testing.
840
841         :num_retries:
842           The default number of times to retry failed requests.
843           This will be used as the default num_retries value when get() and
844           put() are called.  Default 10.
845         """
846         self.lock = threading.Lock()
847         if proxy is None:
848             if config.get('ARVADOS_KEEP_SERVICES'):
849                 proxy = config.get('ARVADOS_KEEP_SERVICES')
850             else:
851                 proxy = config.get('ARVADOS_KEEP_PROXY')
852         if api_token is None:
853             if api_client is None:
854                 api_token = config.get('ARVADOS_API_TOKEN')
855             else:
856                 api_token = api_client.api_token
857         elif api_client is not None:
858             raise ValueError(
859                 "can't build KeepClient with both API client and token")
860         if local_store is None:
861             local_store = os.environ.get('KEEP_LOCAL_STORE')
862
863         if api_client is None:
864             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
865         else:
866             self.insecure = api_client.insecure
867
868         self.block_cache = block_cache if block_cache else KeepBlockCache()
869         self.timeout = timeout
870         self.proxy_timeout = proxy_timeout
871         self._user_agent_pool = queue.LifoQueue()
872         self.upload_counter = _Counter()
873         self.download_counter = _Counter()
874         self.put_counter = _Counter()
875         self.get_counter = _Counter()
876         self.hits_counter = _Counter()
877         self.misses_counter = _Counter()
878         self._storage_classes_unsupported_warning = False
879         self._default_classes = []
880         if num_prefetch_threads is not None:
881             self.num_prefetch_threads = num_prefetch_threads
882         else:
883             self.num_prefetch_threads = 2
884         self._prefetch_queue = None
885         self._prefetch_threads = None
886
887         if local_store:
888             self.local_store = local_store
889             self.head = self.local_store_head
890             self.get = self.local_store_get
891             self.put = self.local_store_put
892         else:
893             self.num_retries = num_retries
894             self.max_replicas_per_service = None
895             if proxy:
896                 proxy_uris = proxy.split()
897                 for i in range(len(proxy_uris)):
898                     if not proxy_uris[i].endswith('/'):
899                         proxy_uris[i] += '/'
900                     # URL validation
901                     url = urllib.parse.urlparse(proxy_uris[i])
902                     if not (url.scheme and url.netloc):
903                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
904                 self.api_token = api_token
905                 self._gateway_services = {}
906                 self._keep_services = [{
907                     'uuid': "00000-bi6l4-%015d" % idx,
908                     'service_type': 'proxy',
909                     '_service_root': uri,
910                     } for idx, uri in enumerate(proxy_uris)]
911                 self._writable_services = self._keep_services
912                 self.using_proxy = True
913                 self._static_services_list = True
914             else:
915                 # It's important to avoid instantiating an API client
916                 # unless we actually need one, for testing's sake.
917                 if api_client is None:
918                     api_client = arvados.api('v1')
919                 self.api_client = api_client
920                 self.api_token = api_client.api_token
921                 self._gateway_services = {}
922                 self._keep_services = None
923                 self._writable_services = None
924                 self.using_proxy = None
925                 self._static_services_list = False
926                 try:
927                     self._default_classes = [
928                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
929                 except KeyError:
930                     # We're talking to an old cluster
931                     pass
932
933     def current_timeout(self, attempt_number):
934         """Return the appropriate timeout to use for this client.
935
936         The proxy timeout setting if the backend service is currently a proxy,
937         the regular timeout setting otherwise.  The `attempt_number` indicates
938         how many times the operation has been tried already (starting from 0
939         for the first try), and scales the connection timeout portion of the
940         return value accordingly.
941
942         """
943         # TODO(twp): the timeout should be a property of a
944         # _KeepService, not a KeepClient. See #4488.
945         t = self.proxy_timeout if self.using_proxy else self.timeout
946         if len(t) == 2:
947             return (t[0] * (1 << attempt_number), t[1])
948         else:
949             return (t[0] * (1 << attempt_number), t[1], t[2])
950     def _any_nondisk_services(self, service_list):
951         return any(ks.get('service_type', 'disk') != 'disk'
952                    for ks in service_list)
953
954     def build_services_list(self, force_rebuild=False):
955         if (self._static_services_list or
956               (self._keep_services and not force_rebuild)):
957             return
958         with self.lock:
959             try:
960                 keep_services = self.api_client.keep_services().accessible()
961             except Exception:  # API server predates Keep services.
962                 keep_services = self.api_client.keep_disks().list()
963
964             # Gateway services are only used when specified by UUID,
965             # so there's nothing to gain by filtering them by
966             # service_type.
967             self._gateway_services = {ks['uuid']: ks for ks in
968                                       keep_services.execute()['items']}
969             if not self._gateway_services:
970                 raise arvados.errors.NoKeepServersError()
971
972             # Precompute the base URI for each service.
973             for r in self._gateway_services.values():
974                 host = r['service_host']
975                 if not host.startswith('[') and host.find(':') >= 0:
976                     # IPv6 URIs must be formatted like http://[::1]:80/...
977                     host = '[' + host + ']'
978                 r['_service_root'] = "{}://{}:{:d}/".format(
979                     'https' if r['service_ssl_flag'] else 'http',
980                     host,
981                     r['service_port'])
982
983             _logger.debug(str(self._gateway_services))
984             self._keep_services = [
985                 ks for ks in self._gateway_services.values()
986                 if not ks.get('service_type', '').startswith('gateway:')]
987             self._writable_services = [ks for ks in self._keep_services
988                                        if not ks.get('read_only')]
989
990             # For disk type services, max_replicas_per_service is 1
991             # It is unknown (unlimited) for other service types.
992             if self._any_nondisk_services(self._writable_services):
993                 self.max_replicas_per_service = None
994             else:
995                 self.max_replicas_per_service = 1
996
997     def _service_weight(self, data_hash, service_uuid):
998         """Compute the weight of a Keep service endpoint for a data
999         block with a known hash.
1000
1001         The weight is md5(h + u) where u is the last 15 characters of
1002         the service endpoint's UUID.
1003         """
1004         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1005
1006     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1007         """Return an array of Keep service endpoints, in the order in
1008         which they should be probed when reading or writing data with
1009         the given hash+hints.
1010         """
1011         self.build_services_list(force_rebuild)
1012
1013         sorted_roots = []
1014         # Use the services indicated by the given +K@... remote
1015         # service hints, if any are present and can be resolved to a
1016         # URI.
1017         for hint in locator.hints:
1018             if hint.startswith('K@'):
1019                 if len(hint) == 7:
1020                     sorted_roots.append(
1021                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1022                 elif len(hint) == 29:
1023                     svc = self._gateway_services.get(hint[2:])
1024                     if svc:
1025                         sorted_roots.append(svc['_service_root'])
1026
1027         # Sort the available local services by weight (heaviest first)
1028         # for this locator, and return their service_roots (base URIs)
1029         # in that order.
1030         use_services = self._keep_services
1031         if need_writable:
1032             use_services = self._writable_services
1033         self.using_proxy = self._any_nondisk_services(use_services)
1034         sorted_roots.extend([
1035             svc['_service_root'] for svc in sorted(
1036                 use_services,
1037                 reverse=True,
1038                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1039         _logger.debug("{}: {}".format(locator, sorted_roots))
1040         return sorted_roots
1041
1042     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1043         # roots_map is a dictionary, mapping Keep service root strings
1044         # to _KeepService objects.  Poll for Keep services, and add any
1045         # new ones to roots_map.  Return the current list of local
1046         # root strings.
1047         headers.setdefault('Authorization', "Bearer %s" % (self.api_token,))
1048         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1049         for root in local_roots:
1050             if root not in roots_map:
1051                 roots_map[root] = self._KeepService(
1052                     root, self._user_agent_pool,
1053                     upload_counter=self.upload_counter,
1054                     download_counter=self.download_counter,
1055                     headers=headers,
1056                     insecure=self.insecure)
1057         return local_roots
1058
1059     @staticmethod
1060     def _check_loop_result(result):
1061         # KeepClient RetryLoops should save results as a 2-tuple: the
1062         # actual result of the request, and the number of servers available
1063         # to receive the request this round.
1064         # This method returns True if there's a real result, False if
1065         # there are no more servers available, otherwise None.
1066         if isinstance(result, Exception):
1067             return None
1068         result, tried_server_count = result
1069         if (result is not None) and (result is not False):
1070             return True
1071         elif tried_server_count < 1:
1072             _logger.info("No more Keep services to try; giving up")
1073             return False
1074         else:
1075             return None
1076
1077     def get_from_cache(self, loc_s):
1078         """Fetch a block only if is in the cache, otherwise return None."""
1079         locator = KeepLocator(loc_s)
1080         slot = self.block_cache.get(locator.md5sum)
1081         if slot is not None and slot.ready.is_set():
1082             return slot.get()
1083         else:
1084             return None
1085
1086     def refresh_signature(self, loc):
1087         """Ask Keep to get the remote block and return its local signature"""
1088         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1089         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1090
1091     @retry.retry_method
1092     def head(self, loc_s, **kwargs):
1093         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1094
1095     @retry.retry_method
1096     def get(self, loc_s, **kwargs):
1097         return self._get_or_head(loc_s, method="GET", **kwargs)
1098
1099     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1100         """Get data from Keep.
1101
1102         This method fetches one or more blocks of data from Keep.  It
1103         sends a request each Keep service registered with the API
1104         server (or the proxy provided when this client was
1105         instantiated), then each service named in location hints, in
1106         sequence.  As soon as one service provides the data, it's
1107         returned.
1108
1109         Arguments:
1110         * loc_s: A string of one or more comma-separated locators to fetch.
1111           This method returns the concatenation of these blocks.
1112         * num_retries: The number of times to retry GET requests to
1113           *each* Keep server if it returns temporary failures, with
1114           exponential backoff.  Note that, in each loop, the method may try
1115           to fetch data from every available Keep service, along with any
1116           that are named in location hints in the locator.  The default value
1117           is set when the KeepClient is initialized.
1118         """
1119         if ',' in loc_s:
1120             return ''.join(self.get(x) for x in loc_s.split(','))
1121
1122         self.get_counter.add(1)
1123
1124         request_id = (request_id or
1125                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1126                       arvados.util.new_request_id())
1127         if headers is None:
1128             headers = {}
1129         headers['X-Request-Id'] = request_id
1130
1131         slot = None
1132         blob = None
1133         try:
1134             locator = KeepLocator(loc_s)
1135             if method == "GET":
1136                 while slot is None:
1137                     slot, first = self.block_cache.reserve_cache(locator.md5sum)
1138                     if first:
1139                         # Fresh and empty "first time it is used" slot
1140                         break
1141                     if prefetch:
1142                         # this is request for a prefetch to fill in
1143                         # the cache, don't need to wait for the
1144                         # result, so if it is already in flight return
1145                         # immediately.  Clear 'slot' to prevent
1146                         # finally block from calling slot.set()
1147                         if slot.ready.is_set():
1148                             slot.get()
1149                         slot = None
1150                         return None
1151
1152                     blob = slot.get()
1153                     if blob is not None:
1154                         self.hits_counter.add(1)
1155                         return blob
1156
1157                     # If blob is None, this means either
1158                     #
1159                     # (a) another thread was fetching this block and
1160                     # failed with an error or
1161                     #
1162                     # (b) cache thrashing caused the slot to be
1163                     # evicted (content set to None) by another thread
1164                     # between the call to reserve_cache() and get().
1165                     #
1166                     # We'll handle these cases by reserving a new slot
1167                     # and then doing a full GET request.
1168                     slot = None
1169
1170             self.misses_counter.add(1)
1171
1172             # If the locator has hints specifying a prefix (indicating a
1173             # remote keepproxy) or the UUID of a local gateway service,
1174             # read data from the indicated service(s) instead of the usual
1175             # list of local disk services.
1176             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1177                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1178             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1179                                for hint in locator.hints if (
1180                                        hint.startswith('K@') and
1181                                        len(hint) == 29 and
1182                                        self._gateway_services.get(hint[2:])
1183                                        )])
1184             # Map root URLs to their _KeepService objects.
1185             roots_map = {
1186                 root: self._KeepService(root, self._user_agent_pool,
1187                                        upload_counter=self.upload_counter,
1188                                        download_counter=self.download_counter,
1189                                        headers=headers,
1190                                        insecure=self.insecure)
1191                 for root in hint_roots
1192             }
1193
1194             # See #3147 for a discussion of the loop implementation.  Highlights:
1195             # * Refresh the list of Keep services after each failure, in case
1196             #   it's being updated.
1197             # * Retry until we succeed, we're out of retries, or every available
1198             #   service has returned permanent failure.
1199             sorted_roots = []
1200             roots_map = {}
1201             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1202                                    backoff_start=2)
1203             for tries_left in loop:
1204                 try:
1205                     sorted_roots = self.map_new_services(
1206                         roots_map, locator,
1207                         force_rebuild=(tries_left < num_retries),
1208                         need_writable=False,
1209                         headers=headers)
1210                 except Exception as error:
1211                     loop.save_result(error)
1212                     continue
1213
1214                 # Query _KeepService objects that haven't returned
1215                 # permanent failure, in our specified shuffle order.
1216                 services_to_try = [roots_map[root]
1217                                    for root in sorted_roots
1218                                    if roots_map[root].usable()]
1219                 for keep_service in services_to_try:
1220                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1221                     if blob is not None:
1222                         break
1223                 loop.save_result((blob, len(services_to_try)))
1224
1225             # Always cache the result, then return it if we succeeded.
1226             if loop.success():
1227                 return blob
1228         finally:
1229             if slot is not None:
1230                 self.block_cache.set(slot, blob)
1231
1232         # Q: Including 403 is necessary for the Keep tests to continue
1233         # passing, but maybe they should expect KeepReadError instead?
1234         not_founds = sum(1 for key in sorted_roots
1235                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1236         service_errors = ((key, roots_map[key].last_result()['error'])
1237                           for key in sorted_roots)
1238         if not roots_map:
1239             raise arvados.errors.KeepReadError(
1240                 "[{}] failed to read {}: no Keep services available ({})".format(
1241                     request_id, loc_s, loop.last_result()))
1242         elif not_founds == len(sorted_roots):
1243             raise arvados.errors.NotFoundError(
1244                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1245         else:
1246             raise arvados.errors.KeepReadError(
1247                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1248
1249     @retry.retry_method
1250     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1251         """Save data in Keep.
1252
1253         This method will get a list of Keep services from the API server, and
1254         send the data to each one simultaneously in a new thread.  Once the
1255         uploads are finished, if enough copies are saved, this method returns
1256         the most recent HTTP response body.  If requests fail to upload
1257         enough copies, this method raises KeepWriteError.
1258
1259         Arguments:
1260         * data: The string of data to upload.
1261         * copies: The number of copies that the user requires be saved.
1262           Default 2.
1263         * num_retries: The number of times to retry PUT requests to
1264           *each* Keep server if it returns temporary failures, with
1265           exponential backoff.  The default value is set when the
1266           KeepClient is initialized.
1267         * classes: An optional list of storage class names where copies should
1268           be written.
1269         """
1270
1271         classes = classes or self._default_classes
1272
1273         if not isinstance(data, bytes):
1274             data = data.encode()
1275
1276         self.put_counter.add(1)
1277
1278         data_hash = hashlib.md5(data).hexdigest()
1279         loc_s = data_hash + '+' + str(len(data))
1280         if copies < 1:
1281             return loc_s
1282         locator = KeepLocator(loc_s)
1283
1284         request_id = (request_id or
1285                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1286                       arvados.util.new_request_id())
1287         headers = {
1288             'X-Request-Id': request_id,
1289             'X-Keep-Desired-Replicas': str(copies),
1290         }
1291         roots_map = {}
1292         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1293                                backoff_start=2)
1294         done_copies = 0
1295         done_classes = []
1296         for tries_left in loop:
1297             try:
1298                 sorted_roots = self.map_new_services(
1299                     roots_map, locator,
1300                     force_rebuild=(tries_left < num_retries),
1301                     need_writable=True,
1302                     headers=headers)
1303             except Exception as error:
1304                 loop.save_result(error)
1305                 continue
1306
1307             pending_classes = []
1308             if done_classes is not None:
1309                 pending_classes = list(set(classes) - set(done_classes))
1310             writer_pool = KeepClient._KeepWriterThreadPool(
1311                 data=data,
1312                 data_hash=data_hash,
1313                 copies=copies - done_copies,
1314                 max_service_replicas=self.max_replicas_per_service,
1315                 timeout=self.current_timeout(num_retries - tries_left),
1316                 classes=pending_classes,
1317             )
1318             for service_root, ks in [(root, roots_map[root])
1319                                      for root in sorted_roots]:
1320                 if ks.finished():
1321                     continue
1322                 writer_pool.add_task(ks, service_root)
1323             writer_pool.join()
1324             pool_copies, pool_classes = writer_pool.done()
1325             done_copies += pool_copies
1326             if (done_classes is not None) and (pool_classes is not None):
1327                 done_classes += pool_classes
1328                 loop.save_result(
1329                     (done_copies >= copies and set(done_classes) == set(classes),
1330                     writer_pool.total_task_nr))
1331             else:
1332                 # Old keepstore contacted without storage classes support:
1333                 # success is determined only by successful copies.
1334                 #
1335                 # Disable storage classes tracking from this point forward.
1336                 if not self._storage_classes_unsupported_warning:
1337                     self._storage_classes_unsupported_warning = True
1338                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1339                 done_classes = None
1340                 loop.save_result(
1341                     (done_copies >= copies, writer_pool.total_task_nr))
1342
1343         if loop.success():
1344             return writer_pool.response()
1345         if not roots_map:
1346             raise arvados.errors.KeepWriteError(
1347                 "[{}] failed to write {}: no Keep services available ({})".format(
1348                     request_id, data_hash, loop.last_result()))
1349         else:
1350             service_errors = ((key, roots_map[key].last_result()['error'])
1351                               for key in sorted_roots
1352                               if roots_map[key].last_result()['error'])
1353             raise arvados.errors.KeepWriteError(
1354                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1355                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1356
1357     def _block_prefetch_worker(self):
1358         """The background downloader thread."""
1359         while True:
1360             try:
1361                 b = self._prefetch_queue.get()
1362                 if b is None:
1363                     return
1364                 self.get(b, prefetch=True)
1365             except Exception:
1366                 _logger.exception("Exception doing block prefetch")
1367
1368     def _start_prefetch_threads(self):
1369         if self._prefetch_threads is None:
1370             with self.lock:
1371                 if self._prefetch_threads is not None:
1372                     return
1373                 self._prefetch_queue = queue.Queue()
1374                 self._prefetch_threads = []
1375                 for i in range(0, self.num_prefetch_threads):
1376                     thread = threading.Thread(target=self._block_prefetch_worker)
1377                     self._prefetch_threads.append(thread)
1378                     thread.daemon = True
1379                     thread.start()
1380
1381     def block_prefetch(self, locator):
1382         """
1383         This relies on the fact that KeepClient implements a block cache,
1384         so repeated requests for the same block will not result in repeated
1385         downloads (unless the block is evicted from the cache.)  This method
1386         does not block.
1387         """
1388
1389         if self.block_cache.get(locator) is not None:
1390             return
1391
1392         self._start_prefetch_threads()
1393         self._prefetch_queue.put(locator)
1394
1395     def stop_prefetch_threads(self):
1396         with self.lock:
1397             if self._prefetch_threads is not None:
1398                 for t in self._prefetch_threads:
1399                     self._prefetch_queue.put(None)
1400                 for t in self._prefetch_threads:
1401                     t.join()
1402             self._prefetch_threads = None
1403             self._prefetch_queue = None
1404
1405     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1406         """A stub for put().
1407
1408         This method is used in place of the real put() method when
1409         using local storage (see constructor's local_store argument).
1410
1411         copies and num_retries arguments are ignored: they are here
1412         only for the sake of offering the same call signature as
1413         put().
1414
1415         Data stored this way can be retrieved via local_store_get().
1416         """
1417         md5 = hashlib.md5(data).hexdigest()
1418         locator = '%s+%d' % (md5, len(data))
1419         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1420             f.write(data)
1421         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1422                   os.path.join(self.local_store, md5))
1423         return locator
1424
1425     def local_store_get(self, loc_s, num_retries=None):
1426         """Companion to local_store_put()."""
1427         try:
1428             locator = KeepLocator(loc_s)
1429         except ValueError:
1430             raise arvados.errors.NotFoundError(
1431                 "Invalid data locator: '%s'" % loc_s)
1432         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1433             return b''
1434         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1435             return f.read()
1436
1437     def local_store_head(self, loc_s, num_retries=None):
1438         """Companion to local_store_put()."""
1439         try:
1440             locator = KeepLocator(loc_s)
1441         except ValueError:
1442             raise arvados.errors.NotFoundError(
1443                 "Invalid data locator: '%s'" % loc_s)
1444         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1445             return True
1446         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1447             return True