21639: Improve critical path of read() from cache
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import errno
19 import io
20 import logging
21 import math
22 import os
23 import pycurl
24 import queue
25 import re
26 import socket
27 import ssl
28 import sys
29 import threading
30 import resource
31 from . import timer
32 import urllib.parse
33 import traceback
34 import weakref
35
36 if sys.version_info >= (3, 0):
37     from io import BytesIO
38 else:
39     from cStringIO import StringIO as BytesIO
40
41 import arvados
42 import arvados.config as config
43 import arvados.errors
44 import arvados.retry as retry
45 import arvados.util
46 import arvados.diskcache
47 from arvados._pycurlhelper import PyCurlHelper
48
49 _logger = logging.getLogger('arvados.keep')
50 global_client_object = None
51
52
53 # Monkey patch TCP constants when not available (apple). Values sourced from:
54 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
55 if sys.platform == 'darwin':
56     if not hasattr(socket, 'TCP_KEEPALIVE'):
57         socket.TCP_KEEPALIVE = 0x010
58     if not hasattr(socket, 'TCP_KEEPINTVL'):
59         socket.TCP_KEEPINTVL = 0x101
60     if not hasattr(socket, 'TCP_KEEPCNT'):
61         socket.TCP_KEEPCNT = 0x102
62
63
64 class KeepLocator(object):
65     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
66     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
67
68     def __init__(self, locator_str):
69         self.hints = []
70         self._perm_sig = None
71         self._perm_expiry = None
72         pieces = iter(locator_str.split('+'))
73         self.md5sum = next(pieces)
74         try:
75             self.size = int(next(pieces))
76         except StopIteration:
77             self.size = None
78         for hint in pieces:
79             if self.HINT_RE.match(hint) is None:
80                 raise ValueError("invalid hint format: {}".format(hint))
81             elif hint.startswith('A'):
82                 self.parse_permission_hint(hint)
83             else:
84                 self.hints.append(hint)
85
86     def __str__(self):
87         return '+'.join(
88             native_str(s)
89             for s in [self.md5sum, self.size,
90                       self.permission_hint()] + self.hints
91             if s is not None)
92
93     def stripped(self):
94         if self.size is not None:
95             return "%s+%i" % (self.md5sum, self.size)
96         else:
97             return self.md5sum
98
99     def _make_hex_prop(name, length):
100         # Build and return a new property with the given name that
101         # must be a hex string of the given length.
102         data_name = '_{}'.format(name)
103         def getter(self):
104             return getattr(self, data_name)
105         def setter(self, hex_str):
106             if not arvados.util.is_hex(hex_str, length):
107                 raise ValueError("{} is not a {}-digit hex string: {!r}".
108                                  format(name, length, hex_str))
109             setattr(self, data_name, hex_str)
110         return property(getter, setter)
111
112     md5sum = _make_hex_prop('md5sum', 32)
113     perm_sig = _make_hex_prop('perm_sig', 40)
114
115     @property
116     def perm_expiry(self):
117         return self._perm_expiry
118
119     @perm_expiry.setter
120     def perm_expiry(self, value):
121         if not arvados.util.is_hex(value, 1, 8):
122             raise ValueError(
123                 "permission timestamp must be a hex Unix timestamp: {}".
124                 format(value))
125         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
126
127     def permission_hint(self):
128         data = [self.perm_sig, self.perm_expiry]
129         if None in data:
130             return None
131         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
132         return "A{}@{:08x}".format(*data)
133
134     def parse_permission_hint(self, s):
135         try:
136             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
137         except IndexError:
138             raise ValueError("bad permission hint {}".format(s))
139
140     def permission_expired(self, as_of_dt=None):
141         if self.perm_expiry is None:
142             return False
143         elif as_of_dt is None:
144             as_of_dt = datetime.datetime.now()
145         return self.perm_expiry <= as_of_dt
146
147
148 class Keep(object):
149     """Simple interface to a global KeepClient object.
150
151     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
152     own API client.  The global KeepClient will build an API client from the
153     current Arvados configuration, which may not match the one you built.
154     """
155     _last_key = None
156
157     @classmethod
158     def global_client_object(cls):
159         global global_client_object
160         # Previously, KeepClient would change its behavior at runtime based
161         # on these configuration settings.  We simulate that behavior here
162         # by checking the values and returning a new KeepClient if any of
163         # them have changed.
164         key = (config.get('ARVADOS_API_HOST'),
165                config.get('ARVADOS_API_TOKEN'),
166                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
167                config.get('ARVADOS_KEEP_PROXY'),
168                os.environ.get('KEEP_LOCAL_STORE'))
169         if (global_client_object is None) or (cls._last_key != key):
170             global_client_object = KeepClient()
171             cls._last_key = key
172         return global_client_object
173
174     @staticmethod
175     def get(locator, **kwargs):
176         return Keep.global_client_object().get(locator, **kwargs)
177
178     @staticmethod
179     def put(data, **kwargs):
180         return Keep.global_client_object().put(data, **kwargs)
181
182 class KeepBlockCache(object):
183     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
184         self.cache_max = cache_max
185         self._cache = collections.OrderedDict()
186         self._cache_lock = threading.Lock()
187         self._max_slots = max_slots
188         self._disk_cache = disk_cache
189         self._disk_cache_dir = disk_cache_dir
190         self._cache_updating = threading.Condition(self._cache_lock)
191
192         if self._disk_cache and self._disk_cache_dir is None:
193             self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
194             os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
195
196         if self._max_slots == 0:
197             if self._disk_cache:
198                 # Each block uses two file descriptors, one used to
199                 # open it initially and hold the flock(), and a second
200                 # hidden one used by mmap().
201                 #
202                 # Set max slots to 1/8 of maximum file handles.  This
203                 # means we'll use at most 1/4 of total file handles.
204                 #
205                 # NOFILE typically defaults to 1024 on Linux so this
206                 # is 128 slots (256 file handles), which means we can
207                 # cache up to 8 GiB of 64 MiB blocks.  This leaves
208                 # 768 file handles for sockets and other stuff.
209                 #
210                 # When we want the ability to have more cache (e.g. in
211                 # arv-mount) we'll increase rlimit before calling
212                 # this.
213                 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
214             else:
215                 # RAM cache slots
216                 self._max_slots = 512
217
218         if self.cache_max == 0:
219             if self._disk_cache:
220                 fs = os.statvfs(self._disk_cache_dir)
221                 # Calculation of available space incorporates existing cache usage
222                 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
223                 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
224                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
225                 # pick smallest of:
226                 # 10% of total disk size
227                 # 25% of available space
228                 # max_slots * 64 MiB
229                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
230             else:
231                 # 256 MiB in RAM
232                 self.cache_max = (256 * 1024 * 1024)
233
234         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
235
236         if self._disk_cache:
237             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
238             self.cap_cache()
239
240
241     class CacheSlot(object):
242         __slots__ = ("locator", "ready", "content")
243
244         def __init__(self, locator):
245             self.locator = locator
246             self.ready = threading.Event()
247             self.content = None
248
249         def get(self):
250             self.ready.wait()
251             return self.content
252
253         def set(self, value):
254             self.content = value
255             self.ready.set()
256
257         def size(self):
258             if self.content is None:
259                 return 0
260             else:
261                 return len(self.content)
262
263         def evict(self):
264             self.content = None
265             return self.gone()
266
267         def gone(self):
268             return (self.content is None)
269
270     def _resize_cache(self, cache_max, max_slots):
271         # Try and make sure the contents of the cache do not exceed
272         # the supplied maximums.
273
274         sm = 0
275         for slot in self._cache.values():
276             sm += slot.size()
277
278         if sm <= cache_max and len(self._cache) <= max_slots:
279             return
280
281         _evict_candidates = collections.deque(self._cache.values())
282         while len(_evict_candidates) > 0 and (sm > cache_max or len(self._cache) > max_slots):
283             slot = _evict_candidates.popleft()
284             if not slot.ready.is_set():
285                 continue
286
287             if slot.content is None:
288                 # error
289                 del self._cache[slot.locator]
290                 continue
291
292             sz = slot.size()
293
294             # If evict returns false it means the
295             # underlying disk cache couldn't lock the file
296             # for deletion because another process was using
297             # it. Don't count it as reducing the amount
298             # of data in the cache, find something else to
299             # throw out.
300             if slot.evict():
301                 sm -= sz
302
303             # check to make sure the underlying data is gone
304             if slot.gone():
305                 # either way we forget about it.  either the
306                 # other process will delete it, or if we need
307                 # it again and it is still there, we'll find
308                 # it on disk.
309                 del self._cache[slot.locator]
310
311
312     def cap_cache(self):
313         '''Cap the cache size to self.cache_max'''
314         with self._cache_updating:
315             self._resize_cache(self.cache_max, self._max_slots)
316             self._cache_updating.notify_all()
317
318     def _get(self, locator):
319         # Test if the locator is already in the cache
320         if locator in self._cache:
321             n = self._cache[locator]
322             if n.ready.is_set() and n.content is None:
323                 del self._cache[n.locator]
324                 return None
325             self._cache.move_to_end(locator)
326             return n
327         if self._disk_cache:
328             # see if it exists on disk
329             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
330             if n is not None:
331                 self._cache[n.locator] = n
332                 return n
333         return None
334
335     def get(self, locator):
336         with self._cache_lock:
337             return self._get(locator)
338
339     def reserve_cache(self, locator):
340         '''Reserve a cache slot for the specified locator,
341         or return the existing slot.'''
342         with self._cache_updating:
343             n = self._get(locator)
344             if n:
345                 return n, False
346             else:
347                 # Add a new cache slot for the locator
348                 self._resize_cache(self.cache_max, self._max_slots-1)
349                 while len(self._cache) >= self._max_slots:
350                     # If there isn't a slot available, need to wait
351                     # for something to happen that releases one of the
352                     # cache slots.  Idle for 200 ms or woken up by
353                     # another thread
354                     self._cache_updating.wait(timeout=0.2)
355                     self._resize_cache(self.cache_max, self._max_slots-1)
356
357                 if self._disk_cache:
358                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
359                 else:
360                     n = KeepBlockCache.CacheSlot(locator)
361                 self._cache[n.locator] = n
362                 return n, True
363
364     def set(self, slot, blob):
365         try:
366             slot.set(blob)
367             return
368         except OSError as e:
369             if e.errno == errno.ENOMEM:
370                 # Reduce max slots to current - 4, cap cache and retry
371                 with self._cache_lock:
372                     self._max_slots = max(4, len(self._cache) - 4)
373             elif e.errno == errno.ENOSPC:
374                 # Reduce disk max space to current - 256 MiB, cap cache and retry
375                 with self._cache_lock:
376                     sm = sum([st.size() for st in self._cache.values()])
377                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
378             elif e.errno == errno.ENODEV:
379                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
380         except Exception as e:
381             pass
382         finally:
383             # Check if we should evict things from the cache.  Either
384             # because we added a new thing or there was an error and
385             # we possibly adjusted the limits down, so we might need
386             # to push something out.
387             self.cap_cache()
388
389         try:
390             # Only gets here if there was an error the first time. The
391             # exception handler adjusts limits downward in some cases
392             # to free up resources, which would make the operation
393             # succeed.
394             slot.set(blob)
395         except Exception as e:
396             # It failed again.  Give up.
397             slot.set(None)
398             raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
399
400         self.cap_cache()
401
402 class Counter(object):
403     def __init__(self, v=0):
404         self._lk = threading.Lock()
405         self._val = v
406
407     def add(self, v):
408         with self._lk:
409             self._val += v
410
411     def get(self):
412         with self._lk:
413             return self._val
414
415
416 class KeepClient(object):
417     DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
418     DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
419
420     class KeepService(PyCurlHelper):
421         """Make requests to a single Keep service, and track results.
422
423         A KeepService is intended to last long enough to perform one
424         transaction (GET or PUT) against one Keep service. This can
425         involve calling either get() or put() multiple times in order
426         to retry after transient failures. However, calling both get()
427         and put() on a single instance -- or using the same instance
428         to access two different Keep services -- will not produce
429         sensible behavior.
430         """
431
432         HTTP_ERRORS = (
433             socket.error,
434             ssl.SSLError,
435             arvados.errors.HttpError,
436         )
437
438         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
439                      upload_counter=None,
440                      download_counter=None,
441                      headers={},
442                      insecure=False):
443             super(KeepClient.KeepService, self).__init__()
444             self.root = root
445             self._user_agent_pool = user_agent_pool
446             self._result = {'error': None}
447             self._usable = True
448             self._session = None
449             self._socket = None
450             self.get_headers = {'Accept': 'application/octet-stream'}
451             self.get_headers.update(headers)
452             self.put_headers = headers
453             self.upload_counter = upload_counter
454             self.download_counter = download_counter
455             self.insecure = insecure
456
457         def usable(self):
458             """Is it worth attempting a request?"""
459             return self._usable
460
461         def finished(self):
462             """Did the request succeed or encounter permanent failure?"""
463             return self._result['error'] == False or not self._usable
464
465         def last_result(self):
466             return self._result
467
468         def _get_user_agent(self):
469             try:
470                 return self._user_agent_pool.get(block=False)
471             except queue.Empty:
472                 return pycurl.Curl()
473
474         def _put_user_agent(self, ua):
475             try:
476                 ua.reset()
477                 self._user_agent_pool.put(ua, block=False)
478             except:
479                 ua.close()
480
481         def get(self, locator, method="GET", timeout=None):
482             # locator is a KeepLocator object.
483             url = self.root + str(locator)
484             _logger.debug("Request: %s %s", method, url)
485             curl = self._get_user_agent()
486             ok = None
487             try:
488                 with timer.Timer() as t:
489                     self._headers = {}
490                     response_body = BytesIO()
491                     curl.setopt(pycurl.NOSIGNAL, 1)
492                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
493                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
494                     curl.setopt(pycurl.URL, url.encode('utf-8'))
495                     curl.setopt(pycurl.HTTPHEADER, [
496                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
497                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
498                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
499                     if self.insecure:
500                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
501                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
502                     else:
503                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
504                     if method == "HEAD":
505                         curl.setopt(pycurl.NOBODY, True)
506                     else:
507                         curl.setopt(pycurl.HTTPGET, True)
508                     self._setcurltimeouts(curl, timeout, method=="HEAD")
509
510                     try:
511                         curl.perform()
512                     except Exception as e:
513                         raise arvados.errors.HttpError(0, str(e))
514                     finally:
515                         if self._socket:
516                             self._socket.close()
517                             self._socket = None
518                     self._result = {
519                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
520                         'body': response_body.getvalue(),
521                         'headers': self._headers,
522                         'error': False,
523                     }
524
525                 ok = retry.check_http_response_success(self._result['status_code'])
526                 if not ok:
527                     self._result['error'] = arvados.errors.HttpError(
528                         self._result['status_code'],
529                         self._headers.get('x-status-line', 'Error'))
530             except self.HTTP_ERRORS as e:
531                 self._result = {
532                     'error': e,
533                 }
534             self._usable = ok != False
535             if self._result.get('status_code', None):
536                 # The client worked well enough to get an HTTP status
537                 # code, so presumably any problems are just on the
538                 # server side and it's OK to reuse the client.
539                 self._put_user_agent(curl)
540             else:
541                 # Don't return this client to the pool, in case it's
542                 # broken.
543                 curl.close()
544             if not ok:
545                 _logger.debug("Request fail: GET %s => %s: %s",
546                               url, type(self._result['error']), str(self._result['error']))
547                 return None
548             if method == "HEAD":
549                 _logger.info("HEAD %s: %s bytes",
550                          self._result['status_code'],
551                          self._result.get('content-length'))
552                 if self._result['headers'].get('x-keep-locator'):
553                     # This is a response to a remote block copy request, return
554                     # the local copy block locator.
555                     return self._result['headers'].get('x-keep-locator')
556                 return True
557
558             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
559                          self._result['status_code'],
560                          len(self._result['body']),
561                          t.msecs,
562                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
563
564             if self.download_counter:
565                 self.download_counter.add(len(self._result['body']))
566             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
567             if resp_md5 != locator.md5sum:
568                 _logger.warning("Checksum fail: md5(%s) = %s",
569                                 url, resp_md5)
570                 self._result['error'] = arvados.errors.HttpError(
571                     0, 'Checksum fail')
572                 return None
573             return self._result['body']
574
575         def put(self, hash_s, body, timeout=None, headers={}):
576             put_headers = copy.copy(self.put_headers)
577             put_headers.update(headers)
578             url = self.root + hash_s
579             _logger.debug("Request: PUT %s", url)
580             curl = self._get_user_agent()
581             ok = None
582             try:
583                 with timer.Timer() as t:
584                     self._headers = {}
585                     body_reader = BytesIO(body)
586                     response_body = BytesIO()
587                     curl.setopt(pycurl.NOSIGNAL, 1)
588                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
589                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
590                     curl.setopt(pycurl.URL, url.encode('utf-8'))
591                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
592                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
593                     # response) instead of sending the request body immediately.
594                     # This allows the server to reject the request if the request
595                     # is invalid or the server is read-only, without waiting for
596                     # the client to send the entire block.
597                     curl.setopt(pycurl.UPLOAD, True)
598                     curl.setopt(pycurl.INFILESIZE, len(body))
599                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
600                     curl.setopt(pycurl.HTTPHEADER, [
601                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
602                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
603                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
604                     if self.insecure:
605                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
606                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
607                     else:
608                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
609                     self._setcurltimeouts(curl, timeout)
610                     try:
611                         curl.perform()
612                     except Exception as e:
613                         raise arvados.errors.HttpError(0, str(e))
614                     finally:
615                         if self._socket:
616                             self._socket.close()
617                             self._socket = None
618                     self._result = {
619                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
620                         'body': response_body.getvalue().decode('utf-8'),
621                         'headers': self._headers,
622                         'error': False,
623                     }
624                 ok = retry.check_http_response_success(self._result['status_code'])
625                 if not ok:
626                     self._result['error'] = arvados.errors.HttpError(
627                         self._result['status_code'],
628                         self._headers.get('x-status-line', 'Error'))
629             except self.HTTP_ERRORS as e:
630                 self._result = {
631                     'error': e,
632                 }
633             self._usable = ok != False # still usable if ok is True or None
634             if self._result.get('status_code', None):
635                 # Client is functional. See comment in get().
636                 self._put_user_agent(curl)
637             else:
638                 curl.close()
639             if not ok:
640                 _logger.debug("Request fail: PUT %s => %s: %s",
641                               url, type(self._result['error']), str(self._result['error']))
642                 return False
643             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
644                          self._result['status_code'],
645                          len(body),
646                          t.msecs,
647                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
648             if self.upload_counter:
649                 self.upload_counter.add(len(body))
650             return True
651
652
653     class KeepWriterQueue(queue.Queue):
654         def __init__(self, copies, classes=[]):
655             queue.Queue.__init__(self) # Old-style superclass
656             self.wanted_copies = copies
657             self.wanted_storage_classes = classes
658             self.successful_copies = 0
659             self.confirmed_storage_classes = {}
660             self.response = None
661             self.storage_classes_tracking = True
662             self.queue_data_lock = threading.RLock()
663             self.pending_tries = max(copies, len(classes))
664             self.pending_tries_notification = threading.Condition()
665
666         def write_success(self, response, replicas_nr, classes_confirmed):
667             with self.queue_data_lock:
668                 self.successful_copies += replicas_nr
669                 if classes_confirmed is None:
670                     self.storage_classes_tracking = False
671                 elif self.storage_classes_tracking:
672                     for st_class, st_copies in classes_confirmed.items():
673                         try:
674                             self.confirmed_storage_classes[st_class] += st_copies
675                         except KeyError:
676                             self.confirmed_storage_classes[st_class] = st_copies
677                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
678                 self.response = response
679             with self.pending_tries_notification:
680                 self.pending_tries_notification.notify_all()
681
682         def write_fail(self, ks):
683             with self.pending_tries_notification:
684                 self.pending_tries += 1
685                 self.pending_tries_notification.notify()
686
687         def pending_copies(self):
688             with self.queue_data_lock:
689                 return self.wanted_copies - self.successful_copies
690
691         def satisfied_classes(self):
692             with self.queue_data_lock:
693                 if not self.storage_classes_tracking:
694                     # Notifies disabled storage classes expectation to
695                     # the outer loop.
696                     return None
697             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
698
699         def pending_classes(self):
700             with self.queue_data_lock:
701                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
702                     return []
703                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
704                 for st_class, st_copies in self.confirmed_storage_classes.items():
705                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
706                         unsatisfied_classes.remove(st_class)
707                 return unsatisfied_classes
708
709         def get_next_task(self):
710             with self.pending_tries_notification:
711                 while True:
712                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
713                         # This notify_all() is unnecessary --
714                         # write_success() already called notify_all()
715                         # when pending<1 became true, so it's not
716                         # possible for any other thread to be in
717                         # wait() now -- but it's cheap insurance
718                         # against deadlock so we do it anyway:
719                         self.pending_tries_notification.notify_all()
720                         # Drain the queue and then raise Queue.Empty
721                         while True:
722                             self.get_nowait()
723                             self.task_done()
724                     elif self.pending_tries > 0:
725                         service, service_root = self.get_nowait()
726                         if service.finished():
727                             self.task_done()
728                             continue
729                         self.pending_tries -= 1
730                         return service, service_root
731                     elif self.empty():
732                         self.pending_tries_notification.notify_all()
733                         raise queue.Empty
734                     else:
735                         self.pending_tries_notification.wait()
736
737
738     class KeepWriterThreadPool(object):
739         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
740             self.total_task_nr = 0
741             if (not max_service_replicas) or (max_service_replicas >= copies):
742                 num_threads = 1
743             else:
744                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
745             _logger.debug("Pool max threads is %d", num_threads)
746             self.workers = []
747             self.queue = KeepClient.KeepWriterQueue(copies, classes)
748             # Create workers
749             for _ in range(num_threads):
750                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
751                 self.workers.append(w)
752
753         def add_task(self, ks, service_root):
754             self.queue.put((ks, service_root))
755             self.total_task_nr += 1
756
757         def done(self):
758             return self.queue.successful_copies, self.queue.satisfied_classes()
759
760         def join(self):
761             # Start workers
762             for worker in self.workers:
763                 worker.start()
764             # Wait for finished work
765             self.queue.join()
766
767         def response(self):
768             return self.queue.response
769
770
771     class KeepWriterThread(threading.Thread):
772         class TaskFailed(RuntimeError): pass
773
774         def __init__(self, queue, data, data_hash, timeout=None):
775             super(KeepClient.KeepWriterThread, self).__init__()
776             self.timeout = timeout
777             self.queue = queue
778             self.data = data
779             self.data_hash = data_hash
780             self.daemon = True
781
782         def run(self):
783             while True:
784                 try:
785                     service, service_root = self.queue.get_next_task()
786                 except queue.Empty:
787                     return
788                 try:
789                     locator, copies, classes = self.do_task(service, service_root)
790                 except Exception as e:
791                     if not isinstance(e, self.TaskFailed):
792                         _logger.exception("Exception in KeepWriterThread")
793                     self.queue.write_fail(service)
794                 else:
795                     self.queue.write_success(locator, copies, classes)
796                 finally:
797                     self.queue.task_done()
798
799         def do_task(self, service, service_root):
800             classes = self.queue.pending_classes()
801             headers = {}
802             if len(classes) > 0:
803                 classes.sort()
804                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
805             success = bool(service.put(self.data_hash,
806                                         self.data,
807                                         timeout=self.timeout,
808                                         headers=headers))
809             result = service.last_result()
810
811             if not success:
812                 if result.get('status_code'):
813                     _logger.debug("Request fail: PUT %s => %s %s",
814                                   self.data_hash,
815                                   result.get('status_code'),
816                                   result.get('body'))
817                 raise self.TaskFailed()
818
819             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
820                           str(threading.current_thread()),
821                           self.data_hash,
822                           len(self.data),
823                           service_root)
824             try:
825                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
826             except (KeyError, ValueError):
827                 replicas_stored = 1
828
829             classes_confirmed = {}
830             try:
831                 scch = result['headers']['x-keep-storage-classes-confirmed']
832                 for confirmation in scch.replace(' ', '').split(','):
833                     if '=' in confirmation:
834                         stored_class, stored_copies = confirmation.split('=')[:2]
835                         classes_confirmed[stored_class] = int(stored_copies)
836             except (KeyError, ValueError):
837                 # Storage classes confirmed header missing or corrupt
838                 classes_confirmed = None
839
840             return result['body'].strip(), replicas_stored, classes_confirmed
841
842
843     def __init__(self, api_client=None, proxy=None,
844                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
845                  api_token=None, local_store=None, block_cache=None,
846                  num_retries=10, session=None, num_prefetch_threads=None):
847         """Initialize a new KeepClient.
848
849         Arguments:
850         :api_client:
851           The API client to use to find Keep services.  If not
852           provided, KeepClient will build one from available Arvados
853           configuration.
854
855         :proxy:
856           If specified, this KeepClient will send requests to this Keep
857           proxy.  Otherwise, KeepClient will fall back to the setting of the
858           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
859           If you want to KeepClient does not use a proxy, pass in an empty
860           string.
861
862         :timeout:
863           The initial timeout (in seconds) for HTTP requests to Keep
864           non-proxy servers.  A tuple of three floats is interpreted as
865           (connection_timeout, read_timeout, minimum_bandwidth). A connection
866           will be aborted if the average traffic rate falls below
867           minimum_bandwidth bytes per second over an interval of read_timeout
868           seconds. Because timeouts are often a result of transient server
869           load, the actual connection timeout will be increased by a factor
870           of two on each retry.
871           Default: (2, 256, 32768).
872
873         :proxy_timeout:
874           The initial timeout (in seconds) for HTTP requests to
875           Keep proxies. A tuple of three floats is interpreted as
876           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
877           described above for adjusting connection timeouts on retry also
878           applies.
879           Default: (20, 256, 32768).
880
881         :api_token:
882           If you're not using an API client, but only talking
883           directly to a Keep proxy, this parameter specifies an API token
884           to authenticate Keep requests.  It is an error to specify both
885           api_client and api_token.  If you specify neither, KeepClient
886           will use one available from the Arvados configuration.
887
888         :local_store:
889           If specified, this KeepClient will bypass Keep
890           services, and save data to the named directory.  If unspecified,
891           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
892           environment variable.  If you want to ensure KeepClient does not
893           use local storage, pass in an empty string.  This is primarily
894           intended to mock a server for testing.
895
896         :num_retries:
897           The default number of times to retry failed requests.
898           This will be used as the default num_retries value when get() and
899           put() are called.  Default 10.
900         """
901         self.lock = threading.Lock()
902         if proxy is None:
903             if config.get('ARVADOS_KEEP_SERVICES'):
904                 proxy = config.get('ARVADOS_KEEP_SERVICES')
905             else:
906                 proxy = config.get('ARVADOS_KEEP_PROXY')
907         if api_token is None:
908             if api_client is None:
909                 api_token = config.get('ARVADOS_API_TOKEN')
910             else:
911                 api_token = api_client.api_token
912         elif api_client is not None:
913             raise ValueError(
914                 "can't build KeepClient with both API client and token")
915         if local_store is None:
916             local_store = os.environ.get('KEEP_LOCAL_STORE')
917
918         if api_client is None:
919             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
920         else:
921             self.insecure = api_client.insecure
922
923         self.block_cache = block_cache if block_cache else KeepBlockCache()
924         self.timeout = timeout
925         self.proxy_timeout = proxy_timeout
926         self._user_agent_pool = queue.LifoQueue()
927         self.upload_counter = Counter()
928         self.download_counter = Counter()
929         self.put_counter = Counter()
930         self.get_counter = Counter()
931         self.hits_counter = Counter()
932         self.misses_counter = Counter()
933         self._storage_classes_unsupported_warning = False
934         self._default_classes = []
935         if num_prefetch_threads is not None:
936             self.num_prefetch_threads = num_prefetch_threads
937         else:
938             self.num_prefetch_threads = 2
939         self._prefetch_queue = None
940         self._prefetch_threads = None
941
942         if local_store:
943             self.local_store = local_store
944             self.head = self.local_store_head
945             self.get = self.local_store_get
946             self.put = self.local_store_put
947         else:
948             self.num_retries = num_retries
949             self.max_replicas_per_service = None
950             if proxy:
951                 proxy_uris = proxy.split()
952                 for i in range(len(proxy_uris)):
953                     if not proxy_uris[i].endswith('/'):
954                         proxy_uris[i] += '/'
955                     # URL validation
956                     url = urllib.parse.urlparse(proxy_uris[i])
957                     if not (url.scheme and url.netloc):
958                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
959                 self.api_token = api_token
960                 self._gateway_services = {}
961                 self._keep_services = [{
962                     'uuid': "00000-bi6l4-%015d" % idx,
963                     'service_type': 'proxy',
964                     '_service_root': uri,
965                     } for idx, uri in enumerate(proxy_uris)]
966                 self._writable_services = self._keep_services
967                 self.using_proxy = True
968                 self._static_services_list = True
969             else:
970                 # It's important to avoid instantiating an API client
971                 # unless we actually need one, for testing's sake.
972                 if api_client is None:
973                     api_client = arvados.api('v1')
974                 self.api_client = api_client
975                 self.api_token = api_client.api_token
976                 self._gateway_services = {}
977                 self._keep_services = None
978                 self._writable_services = None
979                 self.using_proxy = None
980                 self._static_services_list = False
981                 try:
982                     self._default_classes = [
983                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
984                 except KeyError:
985                     # We're talking to an old cluster
986                     pass
987
988     def current_timeout(self, attempt_number):
989         """Return the appropriate timeout to use for this client.
990
991         The proxy timeout setting if the backend service is currently a proxy,
992         the regular timeout setting otherwise.  The `attempt_number` indicates
993         how many times the operation has been tried already (starting from 0
994         for the first try), and scales the connection timeout portion of the
995         return value accordingly.
996
997         """
998         # TODO(twp): the timeout should be a property of a
999         # KeepService, not a KeepClient. See #4488.
1000         t = self.proxy_timeout if self.using_proxy else self.timeout
1001         if len(t) == 2:
1002             return (t[0] * (1 << attempt_number), t[1])
1003         else:
1004             return (t[0] * (1 << attempt_number), t[1], t[2])
1005     def _any_nondisk_services(self, service_list):
1006         return any(ks.get('service_type', 'disk') != 'disk'
1007                    for ks in service_list)
1008
1009     def build_services_list(self, force_rebuild=False):
1010         if (self._static_services_list or
1011               (self._keep_services and not force_rebuild)):
1012             return
1013         with self.lock:
1014             try:
1015                 keep_services = self.api_client.keep_services().accessible()
1016             except Exception:  # API server predates Keep services.
1017                 keep_services = self.api_client.keep_disks().list()
1018
1019             # Gateway services are only used when specified by UUID,
1020             # so there's nothing to gain by filtering them by
1021             # service_type.
1022             self._gateway_services = {ks['uuid']: ks for ks in
1023                                       keep_services.execute()['items']}
1024             if not self._gateway_services:
1025                 raise arvados.errors.NoKeepServersError()
1026
1027             # Precompute the base URI for each service.
1028             for r in self._gateway_services.values():
1029                 host = r['service_host']
1030                 if not host.startswith('[') and host.find(':') >= 0:
1031                     # IPv6 URIs must be formatted like http://[::1]:80/...
1032                     host = '[' + host + ']'
1033                 r['_service_root'] = "{}://{}:{:d}/".format(
1034                     'https' if r['service_ssl_flag'] else 'http',
1035                     host,
1036                     r['service_port'])
1037
1038             _logger.debug(str(self._gateway_services))
1039             self._keep_services = [
1040                 ks for ks in self._gateway_services.values()
1041                 if not ks.get('service_type', '').startswith('gateway:')]
1042             self._writable_services = [ks for ks in self._keep_services
1043                                        if not ks.get('read_only')]
1044
1045             # For disk type services, max_replicas_per_service is 1
1046             # It is unknown (unlimited) for other service types.
1047             if self._any_nondisk_services(self._writable_services):
1048                 self.max_replicas_per_service = None
1049             else:
1050                 self.max_replicas_per_service = 1
1051
1052     def _service_weight(self, data_hash, service_uuid):
1053         """Compute the weight of a Keep service endpoint for a data
1054         block with a known hash.
1055
1056         The weight is md5(h + u) where u is the last 15 characters of
1057         the service endpoint's UUID.
1058         """
1059         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1060
1061     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1062         """Return an array of Keep service endpoints, in the order in
1063         which they should be probed when reading or writing data with
1064         the given hash+hints.
1065         """
1066         self.build_services_list(force_rebuild)
1067
1068         sorted_roots = []
1069         # Use the services indicated by the given +K@... remote
1070         # service hints, if any are present and can be resolved to a
1071         # URI.
1072         for hint in locator.hints:
1073             if hint.startswith('K@'):
1074                 if len(hint) == 7:
1075                     sorted_roots.append(
1076                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1077                 elif len(hint) == 29:
1078                     svc = self._gateway_services.get(hint[2:])
1079                     if svc:
1080                         sorted_roots.append(svc['_service_root'])
1081
1082         # Sort the available local services by weight (heaviest first)
1083         # for this locator, and return their service_roots (base URIs)
1084         # in that order.
1085         use_services = self._keep_services
1086         if need_writable:
1087             use_services = self._writable_services
1088         self.using_proxy = self._any_nondisk_services(use_services)
1089         sorted_roots.extend([
1090             svc['_service_root'] for svc in sorted(
1091                 use_services,
1092                 reverse=True,
1093                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1094         _logger.debug("{}: {}".format(locator, sorted_roots))
1095         return sorted_roots
1096
1097     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1098         # roots_map is a dictionary, mapping Keep service root strings
1099         # to KeepService objects.  Poll for Keep services, and add any
1100         # new ones to roots_map.  Return the current list of local
1101         # root strings.
1102         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1103         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1104         for root in local_roots:
1105             if root not in roots_map:
1106                 roots_map[root] = self.KeepService(
1107                     root, self._user_agent_pool,
1108                     upload_counter=self.upload_counter,
1109                     download_counter=self.download_counter,
1110                     headers=headers,
1111                     insecure=self.insecure)
1112         return local_roots
1113
1114     @staticmethod
1115     def _check_loop_result(result):
1116         # KeepClient RetryLoops should save results as a 2-tuple: the
1117         # actual result of the request, and the number of servers available
1118         # to receive the request this round.
1119         # This method returns True if there's a real result, False if
1120         # there are no more servers available, otherwise None.
1121         if isinstance(result, Exception):
1122             return None
1123         result, tried_server_count = result
1124         if (result is not None) and (result is not False):
1125             return True
1126         elif tried_server_count < 1:
1127             _logger.info("No more Keep services to try; giving up")
1128             return False
1129         else:
1130             return None
1131
1132     def get_from_cache(self, loc_s):
1133         """Fetch a block only if is in the cache, otherwise return None."""
1134         locator = KeepLocator(loc_s)
1135         slot = self.block_cache.get(locator.md5sum)
1136         if slot is not None and slot.ready.is_set():
1137             return slot.get()
1138         else:
1139             return None
1140
1141     def refresh_signature(self, loc):
1142         """Ask Keep to get the remote block and return its local signature"""
1143         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1144         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1145
1146     @retry.retry_method
1147     def head(self, loc_s, **kwargs):
1148         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1149
1150     @retry.retry_method
1151     def get(self, loc_s, **kwargs):
1152         return self._get_or_head(loc_s, method="GET", **kwargs)
1153
1154     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1155         """Get data from Keep.
1156
1157         This method fetches one or more blocks of data from Keep.  It
1158         sends a request each Keep service registered with the API
1159         server (or the proxy provided when this client was
1160         instantiated), then each service named in location hints, in
1161         sequence.  As soon as one service provides the data, it's
1162         returned.
1163
1164         Arguments:
1165         * loc_s: A string of one or more comma-separated locators to fetch.
1166           This method returns the concatenation of these blocks.
1167         * num_retries: The number of times to retry GET requests to
1168           *each* Keep server if it returns temporary failures, with
1169           exponential backoff.  Note that, in each loop, the method may try
1170           to fetch data from every available Keep service, along with any
1171           that are named in location hints in the locator.  The default value
1172           is set when the KeepClient is initialized.
1173         """
1174         if ',' in loc_s:
1175             return ''.join(self.get(x) for x in loc_s.split(','))
1176
1177         self.get_counter.add(1)
1178
1179         request_id = (request_id or
1180                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1181                       arvados.util.new_request_id())
1182         if headers is None:
1183             headers = {}
1184         headers['X-Request-Id'] = request_id
1185
1186         slot = None
1187         blob = None
1188         try:
1189             locator = KeepLocator(loc_s)
1190             if method == "GET":
1191                 while slot is None:
1192                     slot, first = self.block_cache.reserve_cache(locator.md5sum)
1193                     if first:
1194                         # Fresh and empty "first time it is used" slot
1195                         break
1196                     if prefetch:
1197                         # this is request for a prefetch to fill in
1198                         # the cache, don't need to wait for the
1199                         # result, so if it is already in flight return
1200                         # immediately.  Clear 'slot' to prevent
1201                         # finally block from calling slot.set()
1202                         slot = None
1203                         return None
1204
1205                     blob = slot.get()
1206                     if blob is not None:
1207                         self.hits_counter.add(1)
1208                         return blob
1209
1210                     # If blob is None, this means either
1211                     #
1212                     # (a) another thread was fetching this block and
1213                     # failed with an error or
1214                     #
1215                     # (b) cache thrashing caused the slot to be
1216                     # evicted (content set to None) by another thread
1217                     # between the call to reserve_cache() and get().
1218                     #
1219                     # We'll handle these cases by reserving a new slot
1220                     # and then doing a full GET request.
1221                     slot = None
1222
1223             self.misses_counter.add(1)
1224
1225             # If the locator has hints specifying a prefix (indicating a
1226             # remote keepproxy) or the UUID of a local gateway service,
1227             # read data from the indicated service(s) instead of the usual
1228             # list of local disk services.
1229             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1230                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1231             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1232                                for hint in locator.hints if (
1233                                        hint.startswith('K@') and
1234                                        len(hint) == 29 and
1235                                        self._gateway_services.get(hint[2:])
1236                                        )])
1237             # Map root URLs to their KeepService objects.
1238             roots_map = {
1239                 root: self.KeepService(root, self._user_agent_pool,
1240                                        upload_counter=self.upload_counter,
1241                                        download_counter=self.download_counter,
1242                                        headers=headers,
1243                                        insecure=self.insecure)
1244                 for root in hint_roots
1245             }
1246
1247             # See #3147 for a discussion of the loop implementation.  Highlights:
1248             # * Refresh the list of Keep services after each failure, in case
1249             #   it's being updated.
1250             # * Retry until we succeed, we're out of retries, or every available
1251             #   service has returned permanent failure.
1252             sorted_roots = []
1253             roots_map = {}
1254             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1255                                    backoff_start=2)
1256             for tries_left in loop:
1257                 try:
1258                     sorted_roots = self.map_new_services(
1259                         roots_map, locator,
1260                         force_rebuild=(tries_left < num_retries),
1261                         need_writable=False,
1262                         headers=headers)
1263                 except Exception as error:
1264                     loop.save_result(error)
1265                     continue
1266
1267                 # Query KeepService objects that haven't returned
1268                 # permanent failure, in our specified shuffle order.
1269                 services_to_try = [roots_map[root]
1270                                    for root in sorted_roots
1271                                    if roots_map[root].usable()]
1272                 for keep_service in services_to_try:
1273                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1274                     if blob is not None:
1275                         break
1276                 loop.save_result((blob, len(services_to_try)))
1277
1278             # Always cache the result, then return it if we succeeded.
1279             if loop.success():
1280                 return blob
1281         finally:
1282             if slot is not None:
1283                 self.block_cache.set(slot, blob)
1284
1285         # Q: Including 403 is necessary for the Keep tests to continue
1286         # passing, but maybe they should expect KeepReadError instead?
1287         not_founds = sum(1 for key in sorted_roots
1288                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1289         service_errors = ((key, roots_map[key].last_result()['error'])
1290                           for key in sorted_roots)
1291         if not roots_map:
1292             raise arvados.errors.KeepReadError(
1293                 "[{}] failed to read {}: no Keep services available ({})".format(
1294                     request_id, loc_s, loop.last_result()))
1295         elif not_founds == len(sorted_roots):
1296             raise arvados.errors.NotFoundError(
1297                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1298         else:
1299             raise arvados.errors.KeepReadError(
1300                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1301
1302     @retry.retry_method
1303     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1304         """Save data in Keep.
1305
1306         This method will get a list of Keep services from the API server, and
1307         send the data to each one simultaneously in a new thread.  Once the
1308         uploads are finished, if enough copies are saved, this method returns
1309         the most recent HTTP response body.  If requests fail to upload
1310         enough copies, this method raises KeepWriteError.
1311
1312         Arguments:
1313         * data: The string of data to upload.
1314         * copies: The number of copies that the user requires be saved.
1315           Default 2.
1316         * num_retries: The number of times to retry PUT requests to
1317           *each* Keep server if it returns temporary failures, with
1318           exponential backoff.  The default value is set when the
1319           KeepClient is initialized.
1320         * classes: An optional list of storage class names where copies should
1321           be written.
1322         """
1323
1324         classes = classes or self._default_classes
1325
1326         if not isinstance(data, bytes):
1327             data = data.encode()
1328
1329         self.put_counter.add(1)
1330
1331         data_hash = hashlib.md5(data).hexdigest()
1332         loc_s = data_hash + '+' + str(len(data))
1333         if copies < 1:
1334             return loc_s
1335         locator = KeepLocator(loc_s)
1336
1337         request_id = (request_id or
1338                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1339                       arvados.util.new_request_id())
1340         headers = {
1341             'X-Request-Id': request_id,
1342             'X-Keep-Desired-Replicas': str(copies),
1343         }
1344         roots_map = {}
1345         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1346                                backoff_start=2)
1347         done_copies = 0
1348         done_classes = []
1349         for tries_left in loop:
1350             try:
1351                 sorted_roots = self.map_new_services(
1352                     roots_map, locator,
1353                     force_rebuild=(tries_left < num_retries),
1354                     need_writable=True,
1355                     headers=headers)
1356             except Exception as error:
1357                 loop.save_result(error)
1358                 continue
1359
1360             pending_classes = []
1361             if done_classes is not None:
1362                 pending_classes = list(set(classes) - set(done_classes))
1363             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1364                                                         data_hash=data_hash,
1365                                                         copies=copies - done_copies,
1366                                                         max_service_replicas=self.max_replicas_per_service,
1367                                                         timeout=self.current_timeout(num_retries - tries_left),
1368                                                         classes=pending_classes)
1369             for service_root, ks in [(root, roots_map[root])
1370                                      for root in sorted_roots]:
1371                 if ks.finished():
1372                     continue
1373                 writer_pool.add_task(ks, service_root)
1374             writer_pool.join()
1375             pool_copies, pool_classes = writer_pool.done()
1376             done_copies += pool_copies
1377             if (done_classes is not None) and (pool_classes is not None):
1378                 done_classes += pool_classes
1379                 loop.save_result(
1380                     (done_copies >= copies and set(done_classes) == set(classes),
1381                     writer_pool.total_task_nr))
1382             else:
1383                 # Old keepstore contacted without storage classes support:
1384                 # success is determined only by successful copies.
1385                 #
1386                 # Disable storage classes tracking from this point forward.
1387                 if not self._storage_classes_unsupported_warning:
1388                     self._storage_classes_unsupported_warning = True
1389                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1390                 done_classes = None
1391                 loop.save_result(
1392                     (done_copies >= copies, writer_pool.total_task_nr))
1393
1394         if loop.success():
1395             return writer_pool.response()
1396         if not roots_map:
1397             raise arvados.errors.KeepWriteError(
1398                 "[{}] failed to write {}: no Keep services available ({})".format(
1399                     request_id, data_hash, loop.last_result()))
1400         else:
1401             service_errors = ((key, roots_map[key].last_result()['error'])
1402                               for key in sorted_roots
1403                               if roots_map[key].last_result()['error'])
1404             raise arvados.errors.KeepWriteError(
1405                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1406                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1407
1408     def _block_prefetch_worker(self):
1409         """The background downloader thread."""
1410         while True:
1411             try:
1412                 b = self._prefetch_queue.get()
1413                 if b is None:
1414                     return
1415                 self.get(b, prefetch=True)
1416             except Exception:
1417                 _logger.exception("Exception doing block prefetch")
1418
1419     def _start_prefetch_threads(self):
1420         if self._prefetch_threads is None:
1421             with self.lock:
1422                 if self._prefetch_threads is not None:
1423                     return
1424                 self._prefetch_queue = queue.Queue()
1425                 self._prefetch_threads = []
1426                 for i in range(0, self.num_prefetch_threads):
1427                     thread = threading.Thread(target=self._block_prefetch_worker)
1428                     self._prefetch_threads.append(thread)
1429                     thread.daemon = True
1430                     thread.start()
1431
1432     def block_prefetch(self, locator):
1433         """
1434         This relies on the fact that KeepClient implements a block cache,
1435         so repeated requests for the same block will not result in repeated
1436         downloads (unless the block is evicted from the cache.)  This method
1437         does not block.
1438         """
1439
1440         if self.block_cache.get(locator) is not None:
1441             return
1442
1443         self._start_prefetch_threads()
1444         self._prefetch_queue.put(locator)
1445
1446     def stop_prefetch_threads(self):
1447         with self.lock:
1448             if self._prefetch_threads is not None:
1449                 for t in self._prefetch_threads:
1450                     self._prefetch_queue.put(None)
1451                 for t in self._prefetch_threads:
1452                     t.join()
1453             self._prefetch_threads = None
1454             self._prefetch_queue = None
1455
1456     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1457         """A stub for put().
1458
1459         This method is used in place of the real put() method when
1460         using local storage (see constructor's local_store argument).
1461
1462         copies and num_retries arguments are ignored: they are here
1463         only for the sake of offering the same call signature as
1464         put().
1465
1466         Data stored this way can be retrieved via local_store_get().
1467         """
1468         md5 = hashlib.md5(data).hexdigest()
1469         locator = '%s+%d' % (md5, len(data))
1470         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1471             f.write(data)
1472         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1473                   os.path.join(self.local_store, md5))
1474         return locator
1475
1476     def local_store_get(self, loc_s, num_retries=None):
1477         """Companion to local_store_put()."""
1478         try:
1479             locator = KeepLocator(loc_s)
1480         except ValueError:
1481             raise arvados.errors.NotFoundError(
1482                 "Invalid data locator: '%s'" % loc_s)
1483         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1484             return b''
1485         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1486             return f.read()
1487
1488     def local_store_head(self, loc_s, num_retries=None):
1489         """Companion to local_store_put()."""
1490         try:
1491             locator = KeepLocator(loc_s)
1492         except ValueError:
1493             raise arvados.errors.NotFoundError(
1494                 "Invalid data locator: '%s'" % loc_s)
1495         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1496             return True
1497         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1498             return True